Kafka Connect MongoDB Sink Connector

Hello 👋,

In this article we’re going to build a data pipeline that connects Kafka to MongoDB.

In short, we’re going to add a MongoDB Sink connector to a Kafka Connect cluster and run a MongoDB instance in Docker to test the connector.

By reading this article I hope that you will learn

  • How to install the MongoDB connector in Kafka Connect
  • How to configure the MongoDB connector
  • How to create topics in Kafka using Confluent Tools
  • How to monitor Kafka Connect using JConsole.

Let’s get started!

Running MongoDB with Docker Compose 🚢

Confluent provides us with a docker-compose file that already contains everything we need, except for some minor tweaks.

Please download the following file and open it in your favorite editor: https://github.com/confluentinc/cp-all-in-one/blob/6.2.0-post/cp-all-in-one-community/docker-compose.yml.

Apply the following edits to the file, you can replace the connect block and append the mongodb block at the end of the file.

  
  connect:
    image: cnfldemos/kafka-connect-datagen:0.5.0-6.2.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
      - "9102:9102"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost

  mongodb:
    image: mongo:4.2-rc-bionic
    hostname: mongodb
    container_name: mongodb
    depends_on:
      - broker
      - connect
    ports:
      - 27017:27017

Run docker-compose up to start all services and verify that everything is running with docker ps:

CONTAINER ID   IMAGE                                         COMMAND                  CREATED          STATUS
         PORTS
                       NAMES
95165f0156f4   confluentinc/cp-ksqldb-cli:6.2.0              "/bin/sh"                37 minutes ago   Up 37 minutes    
                       ksqldb-cli
ecc4cde0f30b   confluentinc/ksqldb-examples:6.2.0            "bash -c 'echo Waiti…"   37 minutes ago   Up 37 minutes    
                       ksql-datagen
962204b34543   mongo:4.2-rc-bionic                           "docker-entrypoint.s…"   37 minutes ago   Up 37 minutes             0.0.0.0:27017->27017/tcp, :::27017->27017/tcp
                       mongodb
c950f33f501a   confluentinc/cp-ksqldb-server:6.2.0           "/etc/confluent/dock…"   37 minutes ago   Up 37 minutes             0.0.0.0:8088->8088/tcp, :::8088->8088/tcp
                       ksqldb-server
3527577701d3   confluentinc/cp-kafka-rest:6.2.0              "/etc/confluent/dock…"   37 minutes ago   Up 37 minutes             0.0.0.0:8082->8082/tcp, :::8082->8082/tcp
                       rest-proxy
ca69f204f4bb   cnfldemos/kafka-connect-datagen:0.5.0-6.2.0   "/etc/confluent/dock…"   37 minutes ago   Up 31 minutes (healthy)   0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 0.0.0.0:9102->9102/tcp, :::9102->9102/tcp, 9092/tcp
                       connect
aeaea67059c3   confluentinc/cp-schema-registry:6.2.0         "/etc/confluent/dock…"   37 minutes ago   Up 37 minutes             0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
                       schema-registry
b9a761b98a49   confluentinc/cp-kafka:6.2.0                   "/etc/confluent/dock…"   37 minutes ago   Up 37 minutes             0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp, 0.0.0.0:29092->29092/tcp, :::29092->29092/tcp   broker
ca63570b60d4   confluentinc/cp-zookeeper:6.2.0               "/etc/confluent/dock…"   37 minutes ago   Up 37 minutes             2888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 3888/tcp
                       zookeeper

Installing the MongoDB Sink Connector on Kafka Connect 🌠

You may download the connector directly from Github mongodb/mongo-kafka/releases/tag/r1.6.0.

Click on mongodb-kafka-connect-mongodb-1.6.0.zip then unzip it and copy the directory into the plugin path /usr/share/java as defined in the CONNECT_PLUGIN_PATH: “/usr/share/java,/usr/share/confluent-hub-components” environment variable.

To copy it you can run:

docker cp .\mongodb-kafka-connect-mongodb-1.6.0\ connect:/usr/share/java/
 docker restart connect
connect

Connect needs to be restarted to pick-up the newly installed plugin. Verify that the connector plugin has been successfully installed:

➜  bin curl -s -X GET http://localhost:8083/connector-plugins | jq | head -n 20
[
  {
    "class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "type": "sink",
    "version": "1.6.0"
  },
  {
    "class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "type": "source",
    "version": "1.6.0"
  },

Note: If you don’t have jq installed you can omit it.

Creating the topics

Before starting the connector, let’s create the Kafka Topics events and events.deadletter, they will be used them in the connector.

To create the topics, we will need to download Confluent tools and run kafka-topics.

curl -s -O http://packages.confluent.io/archive/6.2/confluent-community-6.2.0.tar.gz
tar -xzf .\confluent-community-6.2.0.tar.gz
cd .\confluent-6.2.0\bin\

 ./kafka-topics --bootstrap-server localhost:9092 --list
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log
docker-connect-configs
docker-connect-offsets
docker-connect-status

./kafka-topics --bootstrap-server localhost:9092 --create --topic events --partitions 3
Created topic events.

./kafka-topics --bootstrap-server localhost:9092 --create --topic events.deadletter --partitions 3
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues, it is best to use either, but not both.
Created topic events.deadletter.

Note: You will need Java to run the Confluent tools if you’re on Ubuntu you can type sudo apt install openjdk-8-jdk.

Starting the connector 🚙

To start the connector, it is enough to do a single post request to the connector’s API with the connector’s configuration.

The configuration that we will use is going to be:

curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
	"name": "mongo-sink-connector",
	"config": {
		"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
		"tasks.max": "1",
		"topics": "events",
		"connection.uri": "mongodb://mongodb:27017/my_events",
		"database": "my_events",
		"collection": "kafka_events",
		"max.num.retries": 5,
		"mongo.errors.tolerance": "all",
		"mongo.errors.log.enable": true,
		"errors.log.include.messages": true,
		"errors.deadletterqueue.topic.name": "events.deadletter",
		"errors.deadletterqueue.context.headers.enable": true,
	}
}'

In short, this POST will create a new connector named mongo-sink-connector using the com.mongodb.kafka.connect.MongoSinkConnector java class, run a single connector task that will get all the messages from the events topic and put them into the Mongo found at mongodb://mongodb:27017/my_events, database named my_events and collection named kafka_events. The records which will fail to be written into the database will be placed on a dead letter topic named events.deadletter, in my opinion this is better than discarding them, since we can inspect the topic to see what went wrong.

To verify that the connector is running, you can retrieve its first tasks status with:

➜  bin curl -s -X GET http://localhost:8083/connectors/mongo-sink-connector/tasks/0/status | jq
{
  "id": 0,
  "state": "RUNNING",
  "worker_id": "connect:8083"
}

Querying the Database 🗃

Now that our Kafka Connect cluster is running and is configured, all that’s left to do is POST some dummy data into Kafka and check for it in the database.

curl --request POST \
  --url http://localhost:8082/topics/events \
  --header 'Content-Type: application/vnd.kafka.json.v2+json' \
  --data '{
	"records": [
		{
			"key": "somekey",
			"value": {
				"glossary": {
					"title": "example glossary",
					"GlossDiv": {
						"title": "S",
						"GlossList": {
							"GlossEntry": {
								"ID": "SGML",
								"SortAs": "SGML",
								"GlossTerm": "Standard Generalized Markup Language",
								"Acronym": "SGML",
								"Abbrev": "ISO 8879:1986",
								"GlossDef": {
									"para": "A meta-markup language, used to create markup languages such as DocBook.",
									"GlossSeeAlso": [
										"GML",
										"XML"
									]
								},
								"GlossSee": "markup"
							}
						}
					}
				}
			}
		}
	]
}'

That’s all! 🎉If we now connect to the database using mongosh or any other client, we can query the data.

mongosh
> use my_events
switched to db my_events
> db.kafka_events.findOne()
{
  _id: ObjectId("6147242856623b0098fc756d"),
  glossary: {
    title: 'example glossary',
    GlossDiv: {
      title: 'S',
      GlossList: {
        GlossEntry: {
          ID: 'SGML',
          SortAs: 'SGML',
          GlossTerm: 'Standard Generalized Markup Language',
          Acronym: 'SGML',
          Abbrev: 'ISO 8879:1986',
          GlossDef: {
            para: 'A meta-markup language, used to create markup languages such as DocBook.',
            GlossSeeAlso: [ 'GML', 'XML' ]
          },
          GlossSee: 'markup'
        }
      }
    }
  }
}

Viewing Kafka Connect JMX Metrics

JConsole is a tool that can be used to view JMX metrics exposed by Kafka Connect, if you installed openjdk-8 it should come with it

Start JConsole and connect to localhost:9102. If you get a warning about an insecure connection, accept the connection, and ignore it.

After you’re connected click the MBeans tab and explore 🦹‍♀️

Summary

Getting into Kafka and Kafka Connect can be a bit overwhelming at first. I hope that this tutorial has provided you with the necessary basics so you can continue to play and explore on your own.

Spinning up a playground for Kafka and Connect using docker-compose isn’t that complicated, you can start from the confluent-cp-community repo, it will give you everything you need to get started. With some little modifications to the docker-compose file, we’ve spawned a MongoDB instance and exposed the JMX metrics in Kafka Connect.

Next, we’ve installed and configured the MongoDB connector and confirmed that it works as expected.

If you have any questions let me know in the comments.

Until next time! 🍻

Improving the throughput of a Producer ✈

Hello 👋,

In this article I will give you some tips on how to improve the throughput of a message producer.

I had to write a Golang based application which would consume messages from Apache Kafka and send them into a sink using HTTP JSON / HTTP Protocol Buffers.

To see if my idea works, I started using a naïve approach in which I polled Kafka for messages and then send each message into the sink, one at a time. This worked, but it was slow.

To better understand the system, a colleague has setup Grafana and a dashboard for monitoring, using Prometheus metrics provided by the Sink. This allowed us to test various versions of the producer and observe it’s behavior.

Let’s explore what we can do to improve the throughput.

Request batching 📪

A very important improvement is request batching.

Instead of sending one message at a time in a single HTTP request, try to send more, if the sink allows it.

As you can see in the image, this simple idea improved the throughput from 100msg/sec to ~4000msg/sec.

Batching is tricky, if your batches are large the receiver might be overwhelmed, or the producer might have a tough time building them. If your batches contain a few items you might not see an improvement. Try to choose a batch number which isn’t too high and not to low either.

Fast JSON libraries ⏩

If you’re using HTTP and JSON then it’s a good idea to replace the standard JSON library.

There are lots of open-source JSON libraries that provide much higher performance compared to standard JSON libraries that are built in the language.

See:

The improvements will be visible.

Partitioning 🖇

There are several partitioning strategies that you can implement. It depends on your tech stack.

Kafka allows you to assign one consumer to one partition, if you have 3 partitions in a topic then you can run 3 consumer instances in parallel from that topic, in the same consumer group, this is called replication, I did not use this as the Sink does not allow it, only one instance of the Producer is running at a time.

If you have multiple topics that you want to consume from, you can partition on the topic name or topic name pattern by subscribing to multiple topics at once using regex. You can have 3 consumers consuming from highspeed.* and 3 consumer consuming from other.*. If each topic has 3 partitions.

Note: The standard build of librdkafka doesn’t support negative lookahead regex expressions, if that’s what you need you will need to build the library from source. See issues/2769. It’s easy to do and the confluent-kafka-go client supports custom builds of librdkafka.

Negative lookahead expressions allow you to ignore some patterns, see this example for a better understanding: regex101.com/r/jZ9AEz/1

Source Parallelization 🕊

The confluent-kafka-go client allows you to poll Kafka for messages. Since polling is thread safe, it can be done in multiple goroutines or threads for a performance improvement.

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    var wg sync.WaitGroup
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}
   
	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
    for i := 0; i < 5; i++ {
        go func() {
            wg.Add(1)
            defer wg.Done()
	        for {
	    	    msg, err := c.ReadMessage(-1)
		        if err == nil {
		    	    fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
                    // TODO: Send data through a channel to be processed by another goroutine.
    		    } else {
			        // The client will automatically try to recover from all errors.
			        fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		        }
	        }
        }()
    }
    wg.Wait()

	c.Close()
}

Buffered Golang channels can also be used in this scenario in order to improve the performance.

Protocol Buffers 🔷

Finally, I saw a huge performance improvement when replacing the JSON body of the request with Protocol Buffers encoded and snappy compressed data.

If your Sink supports receiving protocol buffers, then it is a good idea to try sending it instead of JSON.

Honorable Mention: GZIP Compressed JSON 📚

The Sink supported receiving GZIP compressed JSON, but in my case I didn’t see any notable performance improvements.

I’ve compared the RAM and CPU usage of the Producer, the number of bytes sent over the network and the message throughput. While there were some improvements in some areas, I decided not to implement GZIP compression.

It’s all about trade-offs and needs.

Conclusion

As you could see, there are several things you can do to your producers in order to make them more efficient.

  • Request Batching
  • Fast JSON Libraries
  • Partitioning
  • Source Parallelization & Buffering
  • Protocol Buffers
  • Compression

I hope you’ve enjoyed this article and learned something! If you have some ideas, please let me know in the comments.

Thanks for reading! 😀

Sharding MongoDB using Range strategy

Hi 👋👋

In this article I will explore the topic of sharding a Mongo Database that runs on Kubernetes. Before we get started, if you want to follow along, please install the tools listed in the prerequisites section, and if you want to learn more about sharding, check out this fantastic article Sharding Pattern.

Prerequisites

Introduction

Let’s install a MongoDB instance on the Kubernetes cluster using helm.

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install my-mongo bitnami/mongodb-sharded

After the installation completes, save the database’s root password and replica set key. While doing this the first time I messed up and didn’t save them properly.

Run the following commands to print the password and replica set key on the command line. If you’re on Windows I have provided you with a Powershell function for base64 and if you’re on Unix don’t forget to pass –decode to base64.

kubectl get secret --namespace default my-release-mongodb-sharded -o jsonpath="{.data.mongodb-root-password}" | base64
kubectl get secret --namespace default my-release-mongodb-sharded -o jsonpath="{.data.mongodb-replica-set-key}" | base64

Sharding the Database

Verify that all your pods are running and start a shell connection to the mongos server.

	@denis ➜ ~ kubectl get pods
	NAME                                              READY   STATUS    RESTARTS   AGE
	my-mongo-mongodb-sharded-configsvr-0              1/1     Running   0          3m8s
	my-mongo-mongodb-sharded-configsvr-1              1/1     Running   0          116s
	my-mongo-mongodb-sharded-mongos-c4dd66768-dqlbv   1/1     Running   0          3m8s
	my-mongo-mongodb-sharded-shard0-data-0            1/1     Running   0          3m8s
	my-mongo-mongodb-sharded-shard0-data-1            1/1     Running   0          103s
	my-mongo-mongodb-sharded-shard1-data-0            1/1     Running   0          3m8s
my-mongo-mongodb-sharded-shard1-data-1            1/1     Running   0          93s
kubectl port-forward --namespace default svc/my-mongo-mongodb-sharded 27017:27017
# and in another terminal:
mongosh --host 127.0.0.1 --authenticationDatabase admin -u root -p $MONGODB_ROOT_PASSWORD

By running sh.status() you should get an output which contains two mongo shards:

shards
[
  {
    _id: 'my-mongo-mongodb-sharded-shard-0',
    host: 'my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1
  },
  {
    _id: 'my-mongo-mongodb-sharded-shard-1',
    host: 'my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1
  }
]

To enable sharding on the database and collection, I’m going to insert some dummy data in my_data database and my_users collections. The script used to insert the data is attached at the end of this blog post.

[direct: mongos]> sh.enableSharding("my_data")
{
  ok: 1,
  operationTime: Timestamp(3, 1628345449),
  '$clusterTime': {
    clusterTime: Timestamp(3, 1628345449),
    signature: {
      hash: Binary(Buffer.from("e57c8c37047f7aa170fb59f6b11e22aa65159a30", "hex"), 0),
      keyId: Long("6993682727694237708")
    }
  }
}

[direct: mongos]> db.my_users.createIndex({"t": 1})
[direct: mongos]> sh.shardCollection("my_data.my_users", { "t": 1 })

sh.addShardToZone("my-mongo-mongodb-sharded-shard-1", "TSR1")
sh.addShardToZone("my-mongo-mongodb-sharded-shard-0", "TSR2")

If you’ve made it this far, congrats, you’ve enabled sharding, now let’s define some rules.

Since we’re going to use a range sharding strategy based on the key t, and I have two shards available I would like my data to be distributed in the following way:

 sh.updateZoneKeyRange("my_data.my_users", {t: 46}, {t: MaxKey()}, "TSR2")
 sh.updateZoneKeyRange("my_data.my_users", {t: MinKey()}, {t: 46}, "TSR1")

Note: The label on the TSR2 Zone is wrong, the correct value is: 46 ≤ t < 1000

Running sh.status() should now yield the following output.

    collections: {
      'my_data.my_users': {
        shardKey: { t: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: { shard: 'my-mongo-mongodb-sharded-shard-1', nChunks: 3 },
        chunks: [
          {
            min: { t: MinKey() },
            max: { t: 45 },
            'on shard': 'my-mongo-mongodb-sharded-shard-1',
            'last modified': Timestamp(2, 1)
          },
          {
            min: { t: 46 },
            max: { t: MaxKey() },
            'on shard': 'my-mongo-mongodb-sharded-shard-0',
            'last modified': Timestamp(0, 2)
          }
        ],
        tags: [
          { tag: 'TSR1', min: { t: MinKey() }, max: { t: 46} },
          { tag: 'TSR2', min: { t: 46 }, max: { t: MaxKey() } }
        ]
      }

To test the rules, use the provided python script, modify the times variable and run it with various values.

You can run db.my_users.getShardDistribution() to view the data distribution on the shards.

[direct: mongos]> db.my_users.getShardDistribution()

Shard my-mongo-mongodb-sharded-shard-0 at my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017
{
  data: '144KiB',
  docs: 1667,
  chunks: 1,
  'estimated data per chunk': '144KiB',
  'estimated docs per chunk': 1667
}

Shard my-mongo-mongodb-sharded-shard-1 at my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017
{
  data: '195KiB',
  docs: 2336,
  chunks: 3,
  'estimated data per chunk': '65KiB',
  'estimated docs per chunk': 778
}

Adding More Shards

To add more shards to the cluster all we need to do is run helm upgrade, if you don’t mess up the replica set key like I did it should work on the first run.

helm upgrade my-mongo bitnami/mongodb-sharded --set shards=3,configsvr.replicas=2,shardsvr.dataNode.replicas=2,mongodbRootPassword=tcDMM5sqNC,replicaSetKey=D6BGM2ixd3

If you mess up the key 😅, then to solve the issue and bring your cluster back online follow these steps.

  1. downgrade the cluster back to 2 shards
  2. SSH into an old working shard shard1 or shard0, and grab the credentials from the environment variables.

The kubernetes secret and mongos pod’s credential have been overridden by the upgrade and they are wrong!

MONGODB_ROOT_PASSWORD=tcDMM5sqNC
MONGODB_ENABLE_DIRECTORY_PER_DB=no
MONGODB_SYSTEM_LOG_VERBOSITY=0
MY_MONGO_MONGODB_SHARDED_SERVICE_PORT=27017
KUBERNETES_SERVICE_HOST=10.245.0.1
MONGODB_REPLICA_SET_KEY=D6BGM2ixd3

After you save the correct password and replica set key, search for the volumes that belong to the shards which have the wrong replica set key and delete them. In my case I only delete the volumes which belong to the 3rd shard that I’ve added, since counting starts from 0, I’m looking for shard2 in the name.

@denis ➜ Downloads kubectl get persistentvolumeclaims
NAME                                               STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS       AGE
datadir-my-mongo-mongodb-sharded-configsvr-0       Bound    pvc-8e7fa303-9198-419e-a6c1-8de3e6d89962   8Gi        RWO            do-block-storage   132m
datadir-my-mongo-mongodb-sharded-configsvr-1       Bound    pvc-6e3bc70f-83a8-4e80-b856-c44a4295be35   8Gi        RWO            do-block-storage   131m
datadir-my-mongo-mongodb-sharded-shard0-data-0     Bound    pvc-f66647bc-ee3b-4820-b466-a11b197fde74   8Gi        RWO            do-block-storage   132m
datadir-my-mongo-mongodb-sharded-shard0-data-1     Bound    pvc-62257e91-d461-4ddb-af37-4876d2431703   8Gi        RWO            do-block-storage   131m
datadir-my-mongo-mongodb-sharded-shard1-data-0     Bound    pvc-9a062ba5-f320-49c9-ae15-d75e8e5f2cf8   8Gi        RWO            do-block-storage   132m
datadir-my-mongo-mongodb-sharded-shard1-data-1     Bound    pvc-068b04bd-8875-40d7-b47c-40092ceb7973   8Gi        RWO            do-block-storage   130m
datadir-my-mongo-mongodb-sharded-shard2-data-0     Bound    pvc-93d9a238-ae36-49e1-b0b6-f320baf89373   8Gi        RWO            do-block-storage   73m
datadir-my-mongo-mongodb-sharded-shard2-data-1     Bound    pvc-b09a8d0d-5012-4f23-8096-a713f3025521   8Gi        RWO            do-block-storage   50m
@denis ➜ Downloads kubectl get persistentvolumes
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                                                      STORAGECLASS       REASON   AGE
pvc-068b04bd-8875-40d7-b47c-40092ceb7973   8Gi        RWO            Delete           Bound    default/datadir-my-mongo-mongodb-sharded-shard1-data-1     do-block-storage            131m
pvc-321136d8-8a27-45cb-8ed1-8d636c530859   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard2-data-1   do-block-storage            143m
pvc-42dd7167-5836-4e94-bf42-473c6cea49a4   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard2-data-0   do-block-storage            145m
pvc-48714777-97b3-4acc-8562-7b69a8e3b488   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard1-data-1   do-block-storage            143m
pvc-499797e9-a5df-4c7b-a1fb-482c3dca36a6   8Gi        RWO            Delete           Bound    default/datadir-my-release-mongodb-sharded-shard3-data-1   do-block-storage            143m
pvc-61ec9e04-1bad-4312-ba16-fb24c12efb4b   8Gi        RWO            Delete           Bound    default/datadir-my-release-
...

After that’s done, run the helm upgrade command again and if everything is working get a mongosh connection 😀.

Running sh.status() will now show the 3rd shard.

[
  {
    _id: 'my-mongo-mongodb-sharded-shard-0',
    host: 'my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1,
    tags: [ 'TSR2' ]
  },
  {
    _id: 'my-mongo-mongodb-sharded-shard-1',
    host: 'my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1,
    tags: [ 'TSR1' ]
  },
  {
    _id: 'my-mongo-mongodb-sharded-shard-2',
    host: 'my-mongo-mongodb-sharded-shard-2/my-mongo-mongodb-sharded-shard2-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard2-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017',
    state: 1
  }
]

Next, update the sharding rules and everything will be working as in the diagram.

sh.addShardToZone("my-mongo-mongodb-sharded-shard-2", "TSR3")
sh.removeRangeFromZone("my_data.my_users", {t: 46}, {t: MaxKey()}, "TSR2")
sh.updateZoneKeyRange("my_data.my_users", {t: 46}, {t 1000}, "TSR2")
sh.updateZoneKeyRange("my_data.my_users", {t: 1000}, {t: MaxKey()}, "TSR3")

sh.status() should show something like

        chunks: [
          {
            min: { t: MinKey() },
            max: { t: 46 },
            'on shard': 'my-mongo-mongodb-sharded-shard-1',
            'last modified': Timestamp(0, 5)
          },
          {
            min: { t: 46 },
            max: { t: 1000 },
            'on shard': 'my-mongo-mongodb-sharded-shard-0',
            'last modified': Timestamp(3, 4)
          },
          {
            min: { t: 1000 },
            max: { t: MaxKey() },
            'on shard': 'my-mongo-mongodb-sharded-shard-2',
            'last modified': Timestamp(1, 5)
          }
        ],
        tags: [
          { tag: 'TSR1', min: { t: MinKey() }, max: { t: 46 } },
          { tag: 'TSR2', min: { t: 46 }, max: { t: 1000 } },
          { tag: 'TSR3', min: { t: 1000 }, max: { t: MaxKey() } }
        ]
      }

Conclusions

Shading a MongoDB can seem intimidating at first, but with some practice in advance you can do it! If sharding doesn’t work out for you, you can Convert Sharded Cluster to Replica Set, but, be prepared with some backups.

Thanks for reading 📚 and happy hacking! 🔩🔨

Base64 Powershell Function
function global:Convert-From-Base64 {
  [CmdletBinding()]
  [Alias('base64')]
  param (
    [parameter(ValueFromPipeline,Mandatory=$True,Position=0)]
    [string] $EncodedText
  )
  process {
    [System.Text.Encoding]::ASCII.GetString([System.Convert]::FromBase64String($EncodedText))
  }
}

Python Script

import random

import pymongo


def do_stuff():
    client = pymongo.MongoClient("mongodb://root:tcDMM5sqNC@127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000")
    col = client.my_data.my_users

    usernames = ["dovahkiin", "rey", "dey", "see", "mee", "rollin", "they", "hating"]
    hobbies = ["coding", "recording", "streaming", "batman", "footbal", "sports", "mathematics"]
    ages = [18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
    # times = [12, 14, 15, 23, 45, 32, 20]
    times = [47, 80, 93, 49, 96, 43]

    buffer = []
    for _ in range(1_000):
        first = random.choice(usernames).capitalize()
        mid = random.choice(usernames).capitalize()
        last = random.choice(usernames).capitalize()
        buffer.append(pymongo.InsertOne({
            "name": f"{first} '{mid}' {last}",
            "age": random.choice(ages),
            "hobbies": random.choice(hobbies),
            "t": random.choice(times)
        }))
    col.bulk_write(buffer)


if __name__ == '__main__':
    do_stuff()

References

https://bitnami.com/stack/mongodb-sharded/helm

https://docs.microsoft.com/en-us/azure/architecture/patterns/sharding

https://docs.mongodb.com/manual/core/zone-sharding/

https://docs.mongodb.com/manual/core/ranged-sharding/

https://docs.mongodb.com/manual/reference/method/sh.updateZoneKeyRange/

https://docs.mongodb.com/v5.0/core/sharding-choose-a-shard-key/

Blue vector created by starline – www.freepik.com

Docker basics for Developers

Introduction

Hello 🙋‍♂️

In this article we will discuss a tool called Docker 🐬

Docker is a platform which allows to package individual applications in containers. This achieves application isolation at the OS level without the need to use virtualization technologies by making use of the OS APIs.

Since it can be a little hard to get into Docker if you are new I will try to keep things short and concise.

A little example

Let us say you want to deploy two Applications on a Linux box, and one of them depends on imagemagick library version A and the other one depends on version B.

Since you can have only one version of the library installed at the same time you cannot deploy both applications. ☹

With Docker, you can package the Application and all its dependencies into containers. 📦

Something you can probably achieve without Docker as well if you try hard enough. But Docker makes this amazingly easy for us.

Once you’ve packaged your application you can publish it on  the Docker Hub so other people can make use of.

Let us recap:

  • 🐬Docker is a platform that empowers developers.
  • ☁Docker Hub is a Hub for sharing Docker containers, tools and plugins.
  • 📦Container is an abstraction at the application level.

Installing Docker ⚙

To install Docker, follow the official guide it is well written.

If you are on Windows 10 Home, I recommend you install WSL and WSL2 before installing Docker.

Install WSL on Windows 10 | Microsoft Docs

Packaging 📦

To package you will need a Dockerfile. The Dockerfile is a text document that contains all the steps needed to package the application into a container.

For a Golang application an example Dockerfile would look like this:

# Golang is our base images.
FROM golang:1.7

# Make a directory called simplFT
RUN mkdir -p /go/src/github.com/metonimie/simplFT/

# Copy the current dir contents into simplFT
ADD . /go/src/github.com/metonimie/simplFT/

# Set the working directory to simplFT
WORKDIR /go/src/github.com/metonimie/simplFT/

# Install dependencies
RUN go get "github.com/zyxar/image2ascii/ascii"
RUN go get "github.com/spf13/viper"

# Build the application
RUN go build ./main.go

# Run simplFT when the container launches
CMD ["./main", "-config-name", "docker-config"]

After you have a Dockerfile setup, you can build an image running: docker build . -f path_to_dockerfile -t my_app

Once the image is built you can use the image as a base for containers, and you can run as many as you would like: docker run -d  -p 8080:8080 -p 8081:8081 my_app

To see running containers you can write: docker ps

And to stop a container: docker stop container_id

You can also SSH into containers, mount volumes, expose ports, obtain logs and many more.

If you have secrets in the current directory!!!

Create a file called .dockerignore and include all files that contain sensitive information in it. If you do not, you may leak sensitive information into your Docker images.

Docker and Docker-Compose

Docker-compose is another tool which allows run recipes that involve multiple containers with ease.

If you are a Developer then your application will depend on other services like Nginx, Reddit, MongoDB, RabbitMQ and so on. Having to setup your DEV environment and install all these on your machine is painful a boring, and if you install the wrong version of MongoDB your application may crash.

Just like with Dockerfile you can create a docker-compose.yaml file, in which can reference third party images or your own Dockerfile.

A local development environment for WordPress would look like this:

version: "3.9"
    
services:
  db:
    image: mysql:5.7
    volumes:
      - db_data:/var/lib/mysql
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: somewordpress
      MYSQL_DATABASE: wordpress
      MYSQL_USER: wordpress
      MYSQL_PASSWORD: wordpress
    
  wordpress:
    depends_on:
      - db
    image: wordpress:latest
    volumes:
      - wordpress_data:/var/www/html
    ports:
      - "8000:80"
    restart: always
    environment:
      WORDPRESS_DB_HOST: db:3306
      WORDPRESS_DB_USER: wordpress
      WORDPRESS_DB_PASSWORD: wordpress
      WORDPRESS_DB_NAME: wordpress
volumes:
  db_data: {}
  wordpress_data: {}

You can run docker-compose up -d to start all services and docker-compose down to stop them.

The full example can be found at Quickstart: Compose and WordPress | Docker Documentation.

Thanks for reading! 🧾

Stay healthy and take care!