Kafka Connect GitHub source connector

Hello!

In this article we will discuss how to quickly get started with Kafka and Kafka Connect to grab all the commits from a Github repository.

This is a practical tutorial which saves you some time browsing the Kafka’s documentation.

Environment

Kafka is bit difficult to setup, you will need Kafka, Zookeper and Kafka Connect at least. To simplify the setup we’re going to use Docker and docker-compose.

Once you have Docker and docker-compose installed on your system, you can run a single command and get a Kafka environment for developing.

Let’s start Kafka by using the Confluent Platform repository, make sure you have Docker, Docker-Compose and Git installed before proceding.

git clone https://github.com/confluentinc/cp-all-in-one
cd .\cp-all-in-one\cp-all-in-one-community
docker-compose up

After the docker-compose up command finishes, Kafka and it’s related components should be up and running. To make sure all the components are running properly please start a new terminal window and compare your docker ps output with mine:

CONTAINER ID        IMAGE                                         COMMAND                  CREATED             STATUS                            PORTS                                                                      NAMES
fc95d4828397        confluentinc/ksqldb-examples:5.5.1            "bash -c 'echo Waiti…"   6 minutes ago       Up 6 minutes
                 ksql-datagen
28e67d645888        confluentinc/cp-ksqldb-cli:5.5.1              "/bin/sh"                6 minutes ago       Up 6 minutes
                 ksqldb-cli
371fd742ad1f        confluentinc/cp-ksqldb-server:5.5.1           "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes (healthy)            0.0.0.0:8088->8088/tcp
                 ksqldb-server
caeb86c71308        cnfldemos/kafka-connect-datagen:0.3.2-5.5.0   "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp
                 connect
c4d90361c677        confluentinc/cp-kafka-rest:5.5.1              "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      0.0.0.0:8082->8082/tcp
                 rest-proxy
c3752a0c1487        confluentinc/cp-schema-registry:5.5.1         "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      0.0.0.0:8081->8081/tcp
                 schema-registry
14bfbef71822        confluentinc/cp-kafka:5.5.1                   "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp, 0.0.0.0:29092->29092/tcp   broker
f2ddb8971efa        confluentinc/cp-zookeeper:5.5.1               "/etc/confluent/dock…"   6 minutes ago       Up 6 minutes                      2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
                 zookeeper

If it looks similar then you’re ready to start configuring Kafka Connect.

Kafka Connect

Kafka Connect is a great too for building data pipelines. We’re going to use it to get data from Github into Kafka.

We could write a simple python producer in order to do that, query Github’s API and produce a record for Kafka using a client library, but, Kafka Connect comes with additional benefits. It’s like Docker for Producers and Consumers, in fact, in Kafka Connect, the producers are called source connectors and the consumers sink connectors.

Installing the Github connector

To install the Github connector which is supported by Confluent, you have to download it from here. (click download zip)

After the download has finished, unzip the files.

Before installing it, if you visit: http://localhost:8083/connector-plugins you will see the currently installed connectors, there’s currently no Github connector.

To install it, we need to copy it’s folder into the Kafka Connect container, and restart it:

docker cp ./confluentinc-kafka-connect-github-1.0.1/. connect:/usr/share/java
docker restart connect

If we visit http://localhost:8083/connector-plugins again, we’ll see that the connector has been installed successfully:

{"class":"io.confluent.connect.github.GithubSourceConnector","type":"source","version":"1.0.1"},

Configuring the connector

There’s not configuration for the connector thus the connector isn’t running. To configured a connector you need to make a post request to the configuration endpoint, you can have as many configuration as you like for the connector, each configuration represents a worker in it’s own, the catch is that you need to use a different name.

To view the already configured connectors we can visit: localhost:8083/connectors/. As expected, an empty JSON should be returned.

To configure the Github connector, I took the sample config from it’s docummentation and adapted it into json format.

I’ve also chosen to use the JsonConverter instead of the AvroConverter for simplicity, and instead of getting the stargazers I’m getting the commits of the repo instead.

Here’s the config:

{
"name":"MyGithubConnector",
"config":{
"connector.class":"io.confluent.connect.github.GithubSourceConnector",
"confluent.topic.bootstrap.servers":"broker:29092",
"tasks.max":"1",
"confluent.topic.replication.factor":"1",
"github.service.url":"https://api.github.com",
"github.repositories":"apache/kafka",
"github.tables":"commits",
"github.since":"2019-01-01",
"github.access.token":"YOUR_PERSONAL_ACCESS_TOKEN",
"topic.name.pattern":"github-avro-${entityName}",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":" org.apache.kafka.connect.json.JsonConverter"
}
}

Now, we need to upload the config to the Kafka Connect, I saved it as a file named github-connector-config.json and used curl to upload it. If the upload is successful the server should echo back your config:

➜ cp-all-in-one-community git:(5.5.1-post) ✗ curl -X POST -H "Content-Type: application/json" --data @github-connector-config.json http://localhost:8083/connectors

{"name":"MyGithubConnector","config":{"connector.class":"io.confluent.connect.github.GithubSourceConnector","confluent.topic.bootstrap.servers":"broker:29092","tasks.max":"1","confluent.topic.replication.factor":"1","github.service.url":"https://api.github.com","github.repositories":"apache/kafka","github.tables":"commits","github.since":"2019-01-01","github.access.token":"xxx","topic.name.pattern":"github-avro-${entityName}","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":" org.apache.kafka.connect.json.JsonConverter","name":"MyGithubConnector"},"tasks":[],"type":"source"}%

Visiting http://localhost:8083/connectors/ now returns our config for our Github connector:

["MyGithubConnector"]

If we check it’s status (http://localhost:8083/connectors/MyGithubConnector/status) we should see that the connector is already running:

{"name":"MyGithubConnector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

The connector will pull all the commits it can get, in 5 minutes as I let it ran, It pulled over 2.5k in Github commits.

I will stop it in order to preserve disk space:

curl -X PUT localhost:8083/connectors/MyGithubConnector/pause

Inspecting the data in Kafka

To inspect the data I use a tool called kafkacat.

Running kafkacat -C -b localhost:9092 -t github-avro-commits should show you the data collected from Github:

...
"commit":{ "author":{ "name":"hejiefang", "email":"xxxx", "date":1546437651000 }, "committer":{ "name":"Manikumar Reddy", "email":"xxxx", "date":1546437651000 }, "message":"MINOR: Fix doc format in upgrade notes\n\nAuthor: hejiefang <he.jiefang@zte.com.cn>\n\nReviewers: Srinivas <xxxx>, Manikumar Reddy <xxxx>\n\nCloses #6076 from hejiefang/modifynotable", "tree":{ "sha":"91db6646f829d6636011d09fdc8643e36280716b", "url":"https://api.github.com/repos/apache/kafka/git/trees/91db6646f829d6636011d09fdc8643e36280716b" }, "url":"https://api.github.com/repos/apache/kafka/git/commits/ffd6f2a2e8a573695d0c1c98e663f0b8198b1b6d", "comment_count":0, "verification":{ "verified":false, "reason":"unsigned", "signature":null, "payload":null } },
...

Summary

We’ve successfully setup a simple Kafka playground with docker-compose and we installed a SOURCE connector for Github. It was fairly easy and Kafka Connect’s rest API is quite powerful and user friendly.

In the next article we will take the commits data and index it in ElasticSearch using a SINK connector.

Thanks for reading!

Bonus

You can debug the connector by modifying the logging level:

// Logging set
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.confluent.connect.github.GithubSourceConnector \
-d '{"level": "TRACE"}' \
| jq '.'

How to use Windows 10’s Wireless Display feature

Hi,

In this article I will show you how to use your Windows 10’s wireless display feature in order to use your Laptop/Xbox as a wireless display.

Initial Setup

First things first let’s enable network discoverability on both PCs. Open up Control Panel, go to Network and Internet, next Network and Sharing Center and click on Change advanced sharing settings

Then expand the Private tab and use the same settings as I do, only enable File sharing if you need it.

Turning on file and printer sharing is optional.

Now that this is done we want to mark our house’s Wi-Fi network as Private, this way Windows will trust it and network discovery will be enabled.

Note: When connecting to public networks it’s a good idea to mark them as public and have network discovery off, attackers may use this feature for malicious purposes.

To mark your network as Private type Network Status in the search box, click Properties and make sure that Private is selected.

That’s it! 🎉🎉

Projecting to this PC

Open up search and type Projection Settings, you’ll notice a message prompting you to install the Wireless Display optional Windows feature, click that link and type “Display” then install it.

You can leave the settings as they are, it means that your devices can only project when you open the Connect application on your PC or the Wireless Display application on your Xbox.

Click Launch the Connect app… on your Laptop to start projecting from your PC.

Your laptop should now appear as a Wireless Display when trying to Project from a PC on the same network.

Bonus: Xbox

You can project from your Windows computers on your Xbox by installing the Wireless Display application and launching it. The devices should be connected on the same network and network discovery should be on.

That’s it, thanks for reading!

If you want me to write about something, let me know! 🙂

LeetCode: Flood Fill

Hello,

Here’s my solution for the flood fill problem, found on LeetCode.

If you want me to write about certain topics please let me know in the comments, thank you!

Link to the problem: https://leetcode.com/problems/flood-fill/

"""
An image is represented by a 2-D array of integers, each integer representing the pixel
value of the image (from 0 to 65535).

Given a coordinate (sr, sc) representing the starting pixel (row and column) of the flood fill,
and a pixel value newColor, "flood fill" the image.

To perform a "flood fill", consider the starting pixel, plus any pixels connected 4-directionally
to the starting pixel of the same color as the starting pixel, plus any pixels connected 4-directionally
to those pixels (also with the same color as the starting pixel), and so on. Replace the color of all
of the aforementioned pixels with the newColor.

At the end, return the modified image.
"""
from typing import List, Tuple


class Solution:
    def __init__(self):
        self.visited = set()

    def _set_pixel(self, image: List[List[int]], point: Tuple[int, int], value: int):
        try:
            image[point[0]][point[1]] = value
        except IndexError:
            pass

    def _get_pixel(self, image: List[List[int]], point: Tuple[int, int]):
        if point[0] < 0 or point[1] < 0:
            return None
        try:
            return image[point[0]][point[1]]
        except IndexError:
            return None

    def _floodFill(self, image: List[List[int]], point: Tuple[int, int], color: int, newColor: int) -> List[List[int]]:
        pixel = self._get_pixel(image, point)
        if pixel is not None and pixel == color and point not in self.visited:
            self.visited.add(point)
            self._set_pixel(image, point, newColor)
            self._floodFill(image, (point[0], point[1] + 1), color, newColor)
            self._floodFill(image, (point[0], point[1] - 1), color, newColor)
            self._floodFill(image, (point[0] + 1, point[1]), color, newColor)
            self._floodFill(image, (point[0] - 1, point[1]), color, newColor)

    def floodFill(self, image: List[List[int]], sr: int, sc: int, newColor: int) -> List[List[int]]:
        point = (sr, sc)
        pixel = self._get_pixel(image, point)
        self.visited = set()
        if pixel is not None:
            self._floodFill(image, point, pixel, newColor)
        return image

if __name__ == '__main__':
    s = Solution()

    out = s.floodFill([[1,1,1],[1,1,0],[1,0,1]], 1, 1, 2)
    print("Output", out)
    assert out == [[2,2,2],[2,2,0],[2,0,1]]

    out = s.floodFill([[0,0,0],[0,0,0]], 0, 0, 2)
    print("Output", out)
    assert out == [[2,2,2],[2,2,2]]

    out = s.floodFill([[0,0,0],[0,1,1]], 1, 1, 1)
    print("Output", out)
    assert out == [[0, 0, 0], [0, 1, 1]]