Sharding MongoDB using Range strategy

Hi 👋👋

In this article I will explore the topic of sharding a Mongo Database that runs on Kubernetes. Before we get started, if you want to follow along, please install the tools listed in the prerequisites section, and if you want to learn more about sharding, check out this fantastic article Sharding Pattern.

Prerequisites

Introduction

Let’s install a MongoDB instance on the Kubernetes cluster using helm.

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-mongo bitnami/mongodb-sharded

After the installation completes, save the database’s root password and replica set key. While doing this the first time I messed up and didn’t save them properly.

Run the following commands to print the password and replica set key on the command line. If you’re on Windows I have provided you with a Powershell function for base64 and if you’re on Unix don’t forget to pass –decode to base64.

kubectl get secret --namespace default my-release-mongodb-sharded -o jsonpath="{.data.mongodb-root-password}" | base64
kubectl get secret --namespace default my-release-mongodb-sharded -o jsonpath="{.data.mongodb-replica-set-key}" | base64

Sharding the Database

Verify that all your pods are running and start a shell connection to the mongos server.

	@denis ➜ ~ kubectl get pods
	NAME                                              READY   STATUS    RESTARTS   AGE
	my-mongo-mongodb-sharded-configsvr-0              1/1     Running   0          3m8s
	my-mongo-mongodb-sharded-configsvr-1              1/1     Running   0          116s
	my-mongo-mongodb-sharded-mongos-c4dd66768-dqlbv   1/1     Running   0          3m8s
	my-mongo-mongodb-sharded-shard0-data-0            1/1     Running   0          3m8s
	my-mongo-mongodb-sharded-shard0-data-1            1/1     Running   0          103s
	my-mongo-mongodb-sharded-shard1-data-0            1/1     Running   0          3m8s
my-mongo-mongodb-sharded-shard1-data-1            1/1     Running   0          93s
kubectl port-forward --namespace default svc/my-mongo-mongodb-sharded 27017:27017
# and in another terminal:
mongosh --host 127.0.0.1 --authenticationDatabase admin -u root -p $MONGODB_ROOT_PASSWORD

By running sh.status() you should get an output which contains two mongo shards:

shards
[
  {
    _id: 'my-mongo-mongodb-sharded-shard-0',
    host: 'my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1
  },
  {
    _id: 'my-mongo-mongodb-sharded-shard-1',
    host: 'my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1
  }
]

To enable sharding on the database and collection, I’m going to insert some dummy data in my_data database and my_users collections. The script used to insert the data is attached at the end of this blog post.

[direct: mongos]> sh.enableSharding("my_data")
{
  ok: 1,
  operationTime: Timestamp(3, 1628345449),
  '$clusterTime': {
    clusterTime: Timestamp(3, 1628345449),
    signature: {
      hash: Binary(Buffer.from("e57c8c37047f7aa170fb59f6b11e22aa65159a30", "hex"), 0),
      keyId: Long("6993682727694237708")
    }
  }
}

[direct: mongos]> db.my_users.createIndex({"t": 1})
[direct: mongos]> sh.shardCollection("my_data.my_users", { "t": 1 })

sh.addShardToZone("my-mongo-mongodb-sharded-shard-1", "TSR1")
sh.addShardToZone("my-mongo-mongodb-sharded-shard-0", "TSR2")

If you’ve made it this far, congrats, you’ve enabled sharding, now let’s define some rules.

Since we’re going to use a range sharding strategy based on the key t, and I have two shards available I would like my data to be distributed in the following way:

 sh.updateZoneKeyRange("my_data.my_users", {t: 46}, {t: MaxKey()}, "TSR2")
 sh.updateZoneKeyRange("my_data.my_users", {t: MinKey()}, {t: 46}, "TSR1")

Note: The label on the TSR2 Zone is wrong, the correct value is: 46 ≤ t < 1000

Running sh.status() should now yield the following output.

    collections: {
      'my_data.my_users': {
        shardKey: { t: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: { shard: 'my-mongo-mongodb-sharded-shard-1', nChunks: 3 },
        chunks: [
          {
            min: { t: MinKey() },
            max: { t: 45 },
            'on shard': 'my-mongo-mongodb-sharded-shard-1',
            'last modified': Timestamp(2, 1)
          },
          {
            min: { t: 46 },
            max: { t: MaxKey() },
            'on shard': 'my-mongo-mongodb-sharded-shard-0',
            'last modified': Timestamp(0, 2)
          }
        ],
        tags: [
          { tag: 'TSR1', min: { t: MinKey() }, max: { t: 46} },
          { tag: 'TSR2', min: { t: 46 }, max: { t: MaxKey() } }
        ]
      }

To test the rules, use the provided python script, modify the times variable and run it with various values.

You can run db.my_users.getShardDistribution() to view the data distribution on the shards.

[direct: mongos]> db.my_users.getShardDistribution()

Shard my-mongo-mongodb-sharded-shard-0 at my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017
{
  data: '144KiB',
  docs: 1667,
  chunks: 1,
  'estimated data per chunk': '144KiB',
  'estimated docs per chunk': 1667
}

Shard my-mongo-mongodb-sharded-shard-1 at my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017
{
  data: '195KiB',
  docs: 2336,
  chunks: 3,
  'estimated data per chunk': '65KiB',
  'estimated docs per chunk': 778
}

Adding More Shards

To add more shards to the cluster all we need to do is run helm upgrade, if you don’t mess up the replica set key like I did it should work on the first run.

helm upgrade my-mongo bitnami/mongodb-sharded --set shards=3,configsvr.replicas=2,shardsvr.dataNode.replicas=2,mongodbRootPassword=tcDMM5sqNC,replicaSetKey=D6BGM2ixd3

If you mess up the key 😅, then to solve the issue and bring your cluster back online follow these steps.

  1. downgrade the cluster back to 2 shards
  2. SSH into an old working shard shard1 or shard0, and grab the credentials from the environment variables.

The kubernetes secret and mongos pod’s credential have been overridden by the upgrade and they are wrong!

MONGODB_ROOT_PASSWORD=tcDMM5sqNC
MONGODB_ENABLE_DIRECTORY_PER_DB=no
MONGODB_SYSTEM_LOG_VERBOSITY=0
MY_MONGO_MONGODB_SHARDED_SERVICE_PORT=27017
KUBERNETES_SERVICE_HOST=10.245.0.1
MONGODB_REPLICA_SET_KEY=D6BGM2ixd3

After you save the correct password and replica set key, search for the volumes that belong to the shards which have the wrong replica set key and delete them. In my case I only delete the volumes which belong to the 3rd shard that I’ve added, since counting starts from 0, I’m looking for shard2 in the name.

@denis ➜ Downloads kubectl get persistentvolumeclaims
NAME                                               STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS       AGE
datadir-my-mongo-mongodb-sharded-configsvr-0       Bound    pvc-8e7fa303-9198-419e-a6c1-8de3e6d89962   8Gi        RWO            do-block-storage   132m
datadir-my-mongo-mongodb-sharded-configsvr-1       Bound    pvc-6e3bc70f-83a8-4e80-b856-c44a4295be35   8Gi        RWO            do-block-storage   131m
datadir-my-mongo-mongodb-sharded-shard0-data-0     Bound    pvc-f66647bc-ee3b-4820-b466-a11b197fde74   8Gi        RWO            do-block-storage   132m
datadir-my-mongo-mongodb-sharded-shard0-data-1     Bound    pvc-62257e91-d461-4ddb-af37-4876d2431703   8Gi        RWO            do-block-storage   131m
datadir-my-mongo-mongodb-sharded-shard1-data-0     Bound    pvc-9a062ba5-f320-49c9-ae15-d75e8e5f2cf8   8Gi        RWO            do-block-storage   132m
datadir-my-mongo-mongodb-sharded-shard1-data-1     Bound    pvc-068b04bd-8875-40d7-b47c-40092ceb7973   8Gi        RWO            do-block-storage   130m
datadir-my-mongo-mongodb-sharded-shard2-data-0     Bound    pvc-93d9a238-ae36-49e1-b0b6-f320baf89373   8Gi        RWO            do-block-storage   73m
datadir-my-mongo-mongodb-sharded-shard2-data-1     Bound    pvc-b09a8d0d-5012-4f23-8096-a713f3025521   8Gi        RWO            do-block-storage   50m
@denis ➜ Downloads kubectl get persistentvolumes
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                                                      STORAGECLASS       REASON   AGE
pvc-068b04bd-8875-40d7-b47c-40092ceb7973   8Gi        RWO            Delete           Bound    default/datadir-my-mongo-mongodb-sharded-shard1-data-1     do-block-storage            131m
pvc-321136d8-8a27-45cb-8ed1-8d636c530859   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard2-data-1   do-block-storage            143m
pvc-42dd7167-5836-4e94-bf42-473c6cea49a4   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard2-data-0   do-block-storage            145m
pvc-48714777-97b3-4acc-8562-7b69a8e3b488   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard1-data-1   do-block-storage            143m
pvc-499797e9-a5df-4c7b-a1fb-482c3dca36a6   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard3-data-1   do-block-storage            143m
pvc-61ec9e04-1bad-4312-ba16-fb24c12efb4b   8Gi        RWO            Delete           Bound    default/datadir-my-release-
...

After that’s done, run the helm upgrade command again and if everything is working get a mongosh connection 😀.

Running sh.status() will now show the 3rd shard.

[
  {
    _id: 'my-mongo-mongodb-sharded-shard-0',
    host: 'my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1,
    tags: [ 'TSR2' ]
  },
  {
    _id: 'my-mongo-mongodb-sharded-shard-1',
    host: 'my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1,
    tags: [ 'TSR1' ]
  },
  {
    _id: 'my-mongo-mongodb-sharded-shard-2',
    host: 'my-mongo-mongodb-sharded-shard-2/my-mongo-mongodb-sharded-shard2-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard2-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1
  }
]

Next, update the sharding rules and everything will be working as in the diagram.

sh.addShardToZone("my-mongo-mongodb-sharded-shard-2", "TSR3")
sh.removeRangeFromZone("my_data.my_users", {t: 46}, {t: MaxKey()}, "TSR2")
sh.updateZoneKeyRange("my_data.my_users", {t: 46}, {t 1000}, "TSR2")
sh.updateZoneKeyRange("my_data.my_users", {t: 1000}, {t: MaxKey()}, "TSR3")

sh.status() should show something like

        chunks: [
          {
            min: { t: MinKey() },
            max: { t: 46 },
            'on shard': 'my-mongo-mongodb-sharded-shard-1',
            'last modified': Timestamp(0, 5)
          },
          {
            min: { t: 46 },
            max: { t: 1000 },
            'on shard': 'my-mongo-mongodb-sharded-shard-0',
            'last modified': Timestamp(3, 4)
          },
          {
            min: { t: 1000 },
            max: { t: MaxKey() },
            'on shard': 'my-mongo-mongodb-sharded-shard-2',
            'last modified': Timestamp(1, 5)
          }
        ],
        tags: [
          { tag: 'TSR1', min: { t: MinKey() }, max: { t: 46 } },
          { tag: 'TSR2', min: { t: 46 }, max: { t: 1000 } },
          { tag: 'TSR3', min: { t: 1000 }, max: { t: MaxKey() } }
        ]
      }

Conclusions

Shading a MongoDB can seem intimidating at first, but with some practice in advance you can do it! If sharding doesn’t work out for you, you can Convert Sharded Cluster to Replica Set, but, be prepared with some backups.

Thanks for reading 📚 and happy hacking! 🔩🔨

Base64 Powershell Function
function global:Convert-From-Base64 {
  [CmdletBinding()]
  [Alias('base64')]
  param (
    [parameter(ValueFromPipeline,Mandatory=$True,Position=0)]
    [string] $EncodedText
  )
  process {
    [System.Text.Encoding]::ASCII.GetString([System.Convert]::FromBase64String($EncodedText))
  }
}

Python Script

import random

import pymongo


def do_stuff():
    client = pymongo.MongoClient("mongodb://root:tcDMM5sqNC@127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000")
    col = client.my_data.my_users

    usernames = ["dovahkiin", "rey", "dey", "see", "mee", "rollin", "they", "hating"]
    hobbies = ["coding", "recording", "streaming", "batman", "footbal", "sports", "mathematics"]
    ages = [18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
    # times = [12, 14, 15, 23, 45, 32, 20]
    times = [47, 80, 93, 49, 96, 43]

    buffer = []
    for _ in range(1_000):
        first = random.choice(usernames).capitalize()
        mid = random.choice(usernames).capitalize()
        last = random.choice(usernames).capitalize()
        buffer.append(pymongo.InsertOne({
            "name": f"{first} '{mid}' {last}",
            "age": random.choice(ages),
            "hobbies": random.choice(hobbies),
            "t": random.choice(times)
        }))
    col.bulk_write(buffer)


if __name__ == '__main__':
    do_stuff()

References

https://bitnami.com/stack/mongodb-sharded/helm

https://docs.microsoft.com/en-us/azure/architecture/patterns/sharding

https://docs.mongodb.com/manual/core/zone-sharding/

https://docs.mongodb.com/manual/core/ranged-sharding/

https://docs.mongodb.com/manual/reference/method/sh.updateZoneKeyRange/

https://docs.mongodb.com/v5.0/core/sharding-choose-a-shard-key/

Blue vector created by starline – www.freepik.com

Kafka Connect GitHub source connector

Hello!

In this article we will discuss how to quickly get started with Kafka and Kafka Connect to grab all the commits from a Github repository.

This is a practical tutorial which saves you some time browsing the Kafka’s documentation.

Environment

Kafka is bit difficult to setup, you will need Kafka, Zookeper and Kafka Connect at least. To simplify the setup we’re going to use Docker and docker-compose.

Once you have Docker and docker-compose installed on your system, you can run a single command and get a Kafka environment for developing.

Let’s start Kafka by using the Confluent Platform repository, make sure you have Docker, Docker-Compose and Git installed before proceding.

git clone https://github.com/confluentinc/cp-all-in-one
cd .\cp-all-in-one\cp-all-in-one-community
docker-compose up

After the docker-compose up command finishes, Kafka and it’s related components should be up and running. To make sure all the components are running properly please start a new terminal window and compare your docker ps output with mine:

CONTAINER ID        IMAGE                                         COMMAND                  CREATED             STATUS                            PORTS                                                                      NAMES
fc95d4828397        confluentinc/ksqldb-examples:5.5.1            "bash -c 'echo Waiti…"   6 minutes ago       Up 6 minutes
                 ksql-datagen
28e67d645888        confluentinc/cp-ksqldb-cli:5.5.1              "/bin/sh"                6 minutes ago       Up 6 minutes
                 ksqldb-cli
371fd742ad1f        confluentinc/cp-ksqldb-server:5.5.1           "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes (healthy)            0.0.0.0:8088->8088/tcp
                 ksqldb-server
caeb86c71308        cnfldemos/kafka-connect-datagen:0.3.2-5.5.0   "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp
                 connect
c4d90361c677        confluentinc/cp-kafka-rest:5.5.1              "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      0.0.0.0:8082->8082/tcp
                 rest-proxy
c3752a0c1487        confluentinc/cp-schema-registry:5.5.1         "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      0.0.0.0:8081->8081/tcp
                 schema-registry
14bfbef71822        confluentinc/cp-kafka:5.5.1                   "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp, 0.0.0.0:29092->29092/tcp   broker
f2ddb8971efa        confluentinc/cp-zookeeper:5.5.1               "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
                 zookeeper

If it looks similar then you’re ready to start configuring Kafka Connect.

Kafka Connect

Kafka Connect is a great tool for building data pipelines. We’re going to use it to get data from Github into Kafka.

We could write a simple python producer in order to do that, query Github’s API and produce a record for Kafka using a client library, but, Kafka Connect comes with additional benefits. It’s like Docker containers for Producers and Consumers, in fact, in Kafka Connect, the producers are called source connectors and the consumers sink connectors.

Installing the Github connector

To install the Github connector which is supported by Confluent, you have to download it from here. (click download zip)

After the download has finished, unzip the files.

Before installing it, if you visit: http://localhost:8083/connector-plugins you will see the currently installed connectors, there’s currently no Github connector.

To install it, we need to copy it’s folder into the Kafka Connect container, and restart it:

docker cp ./confluentinc-kafka-connect-github-1.0.1/. connect:/usr/share/java
docker restart connect

If we visit http://localhost:8083/connector-plugins again, we’ll see that the connector has been installed successfully:

{"class":"io.confluent.connect.github.GithubSourceConnector","type":"source","version":"1.0.1"},

Configuring the connector

There’s not configuration for the connector thus the connector isn’t running. To configured a connector you need to make a post request to the configuration endpoint, you can have as many configuration as you like for the connector, each configuration represents a worker in it’s own, the catch is that you need to use a different name.

To view the already configured connectors we can visit: localhost:8083/connectors/. As expected, an empty JSON should be returned.

To configure the Github connector, I took the sample config from it’s docummentation and adapted it into json format.

I’ve also chosen to use the JsonConverter instead of the AvroConverter for simplicity, and instead of getting the stargazers I’m getting the commits of the repo instead.

Here’s the config:

{
"name":"MyGithubConnector",
"config":{
"connector.class":"io.confluent.connect.github.GithubSourceConnector",
"confluent.topic.bootstrap.servers":"broker:29092",
"tasks.max":"1",
"confluent.topic.replication.factor":"1",
"github.service.url":"https://api.github.com",
"github.repositories":"apache/kafka",
"github.tables":"commits",
"github.since":"2019-01-01",
"github.access.token":"YOUR_PERSONAL_ACCESS_TOKEN",
"topic.name.pattern":"github-avro-${entityName}",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":" org.apache.kafka.connect.json.JsonConverter"
}
}

Now, we need to upload the config to the Kafka Connect, I saved it as a file named github-connector-config.json and used curl to upload it. If the upload is successful the server should echo back your config:

➜ cp-all-in-one-community git:(5.5.1-post) ✗ curl -X POST -H "Content-Type: application/json" --data @github-connector-config.json http://localhost:8083/connectors

{"name":"MyGithubConnector","config":{"connector.class":"io.confluent.connect.github.GithubSourceConnector","confluent.topic.bootstrap.servers":"broker:29092","tasks.max":"1","confluent.topic.replication.factor":"1","github.service.url":"https://api.github.com","github.repositories":"apache/kafka","github.tables":"commits","github.since":"2019-01-01","github.access.token":"xxx","topic.name.pattern":"github-avro-${entityName}","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":" org.apache.kafka.connect.json.JsonConverter","name":"MyGithubConnector"},"tasks":[],"type":"source"}%

Visiting http://localhost:8083/connectors/ now returns our config for our Github connector:

["MyGithubConnector"]

If we check it’s status (http://localhost:8083/connectors/MyGithubConnector/status) we should see that the connector is already running:

{"name":"MyGithubConnector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

The connector will pull all the commits it can get, in 5 minutes as I let it ran, It pulled over 2.5k in Github commits.

I will stop it in order to preserve disk space:

curl -X PUT localhost:8083/connectors/MyGithubConnector/pause

Inspecting the data in Kafka

To inspect the data I use a tool called kafkacat.

Running kafkacat -C -b localhost:9092 -t github-avro-commits should show you the data collected from Github:

...
"commit":{ "author":{ "name":"hejiefang", "email":"xxxx", "date":1546437651000 }, "committer":{ "name":"Manikumar Reddy", "email":"xxxx", "date":1546437651000 }, "message":"MINOR: Fix doc format in upgrade notes\n\nAuthor: hejiefang <he.jiefang@zte.com.cn>\n\nReviewers: Srinivas <xxxx>, Manikumar Reddy <xxxx>\n\nCloses #6076 from hejiefang/modifynotable", "tree":{ "sha":"91db6646f829d6636011d09fdc8643e36280716b", "url":"https://api.github.com/repos/apache/kafka/git/trees/91db6646f829d6636011d09fdc8643e36280716b" }, "url":"https://api.github.com/repos/apache/kafka/git/commits/ffd6f2a2e8a573695d0c1c98e663f0b8198b1b6d", "comment_count":0, "verification":{ "verified":false, "reason":"unsigned", "signature":null, "payload":null } },
...

Summary

We’ve successfully setup a simple Kafka playground with docker-compose and we installed a SOURCE connector for Github. It was fairly easy and Kafka Connect’s rest API is quite powerful and user friendly.

In the next article we will take the commits data and index it in ElasticSearch using a SINK connector.

Thanks for reading!

Bonus

You can debug the connector by modifying the logging level:

// Logging set
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.confluent.connect.github.GithubSourceConnector \
-d '{"level": "TRACE"}' \
| jq '.'