In this article I will give you some tips on how to improve the throughput of a message producer.
I had to write a Golang based application which would consume messages from Apache Kafka and send them into a sink using HTTP JSON / HTTP Protocol Buffers.
To see if my idea works, I started using a naΓ―ve approach in which I polled Kafka for messages and then send each message into the sink, one at a time. This worked, but it was slow.
To better understand the system, a colleague has setup Grafana and a dashboard for monitoring, using Prometheus metrics provided by the Sink. This allowed us to test various versions of the producer and observe it’s behavior.
Let’s explore what we can do to improve the throughput.
Request batching πͺ
A very important improvement is request batching.
Instead of sending one message at a time in a single HTTP request, try to send more, if the sink allows it.
As you can see in the image, this simple idea improved the throughput from 100msg/sec to ~4000msg/sec.
Batching is tricky, if your batches are large the receiver might be overwhelmed, or the producer might have a tough time building them. If your batches contain a few items you might not see an improvement. Try to choose a batch number which isnβt too high and not to low either.
There are several partitioning strategies that you can implement. It depends on your tech stack.
Kafka allows you to assign one consumer to one partition, if you have 3 partitions in a topic then you can run 3 consumer instances in parallel from that topic, in the same consumer group, this is called replication, I did not use this as the Sink does not allow it, only one instance of the Producer is running at a time.
If you have multiple topics that you want to consume from, you can partition on the topic name or topic name pattern by subscribing to multiple topics at once using regex. You can have 3 consumers consuming from highspeed.* and 3 consumer consuming from other.*. If each topic has 3 partitions.
Note: The standard build of librdkafka doesn’t support negative lookahead regex expressions, if that’s what you need you will need to build the library from source. See issues/2769. It’s easy to do and the confluent-kafka-go client supports custom builds of librdkafka.
Negative lookahead expressions allow you to ignore some patterns, see this example for a better understanding: regex101.com/r/jZ9AEz/1
Source Parallelization π
The confluent-kafka-go client allows you to poll Kafka for messages. Since polling is thread safe, it can be done in multiple goroutines or threads for a performance improvement.
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
var wg sync.WaitGroup
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for i := 0; i < 5; i++ {
go func() {
wg.Add(1)
defer wg.Done()
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
// TODO: Send data through a channel to be processed by another goroutine.
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}()
}
wg.Wait()
c.Close()
}
Buffered Golang channels can also be used in this scenario in order to improve the performance.
Protocol Buffers π·
Finally, I saw a huge performance improvement when replacing the JSON body of the request with Protocol Buffers encoded and snappy compressed data.
If your Sink supports receiving protocol buffers, then it is a good idea to try sending it instead of JSON.
Honorable Mention: GZIP Compressed JSON π
The Sink supported receiving GZIP compressed JSON, but in my case I didn’t see any notable performance improvements.
I’ve compared the RAM and CPU usage of the Producer, the number of bytes sent over the network and the message throughput. While there were some improvements in some areas, I decided not to implement GZIP compression.
It’s all about trade-offs and needs.
Conclusion
As you could see, there are several things you can do to your producers in order to make them more efficient.
Request Batching
Fast JSON Libraries
Partitioning
Source Parallelization & Buffering
Protocol Buffers
Compression
I hope you’ve enjoyed this article and learned something! If you have some ideas, please let me know in the comments.
Yes, this post title is weird, but I want to take a moment to acknowledge that I’m very grateful for dad for buying these computer speakers and much more.
In ~2002 my dad bought me my first computer, and at the same time a man approached us asking if we want to buy his second-hand speakers. I was about 5-6 years old at that time and I still remember the moment.
I’m 25 years old now and these are the only computer speakers that I’ve used, they are almost 20 years old!
In this article I will explore the topic of sharding a Mongo Database that runs on Kubernetes. Before we get started, if you want to follow along, please install the tools listed in the prerequisites section, and if you want to learn more about sharding, check out this fantastic article Sharding Pattern.
After the installation completes, save the database’s root password and replica set key. While doing this the first time I messed up and didn’t save them properly.
Run the following commands to print the password and replica set key on the command line. If you’re on Windows I have provided you with a Powershell function for base64 and if you’re on Unix don’t forget to pass –decode to base64.
kubectl get secret --namespace default my-release-mongodb-sharded -o jsonpath="{.data.mongodb-root-password}" | base64
kubectl get secret --namespace default my-release-mongodb-sharded -o jsonpath="{.data.mongodb-replica-set-key}" | base64
Sharding the Database
Verify that all your pods are running and start a shell connection to the mongos server.
To enable sharding on the database and collection, I’m going to insert some dummy data in my_data database and my_users collections. The script used to insert the data is attached at the end of this blog post.
If you’ve made it this far, congrats, you’ve enabled sharding, now let’s define some rules.
Since we’re going to use a range sharding strategy based on the key t, and I have two shards available I would like my data to be distributed in the following way:
To test the rules, use the provided python script, modify the times variable and run it with various values.
You can run db.my_users.getShardDistribution() to view the data distribution on the shards.
[direct: mongos]> db.my_users.getShardDistribution()
Shard my-mongo-mongodb-sharded-shard-0 at my-mongo-mongodb-sharded-shard-0/my-mongo-mongodb-sharded-shard0-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard0-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017
{
data: '144KiB',
docs: 1667,
chunks: 1,
'estimated data per chunk': '144KiB',
'estimated docs per chunk': 1667
}
Shard my-mongo-mongodb-sharded-shard-1 at my-mongo-mongodb-sharded-shard-1/my-mongo-mongodb-sharded-shard1-data-0.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017,my-mongo-mongodb-sharded-shard1-data-1.my-mongo-mongodb-sharded-headless.default.svc.cluster.local:27017
{
data: '195KiB',
docs: 2336,
chunks: 3,
'estimated data per chunk': '65KiB',
'estimated docs per chunk': 778
}
Adding More Shards
To add more shards to the cluster all we need to do is run helm upgrade, if you don’t mess up the replica set key like I did it should work on the first run.
After you save the correct password and replica set key, search for the volumes that belong to the shards which have the wrong replica set key and delete them. In my case I only delete the volumes which belong to the 3rd shard that I’ve added, since counting starts from 0, I’m looking for shard2 in the name.
Shading a MongoDB can seem intimidating at first, but with some practice in advance you can do it! If sharding doesn’t work out for you, you can Convert Sharded Cluster to Replica Set, but, be prepared with some backups.