6 minute read

Hi, this article is about Kafka connect!

Introduction

Kafka connect is a tool for streaming data between Kafka and other systems. It is distributed and scalable by default and since it’s a standardized tool there are lots of connectors already available.

Connectors connect Kafka to a system or vice versa. There are two types of connectors

  • Source: Source connectors grab data from an existing system e.g: MariaDB, PostgreSQL, S3, Jira, and others, and stream the data into one or more Kafka topics.
  • Sink: Sink connectors grab the data from the topics and ingests it to a new system, eg: MongoDB, Snowflake, S3.

If you want to stream change data capture events from your databases, the Debezium provides connectors that allow you to do just that. CDC is an append only log that identifies changes in databases, using a cdc stream you can replicate or reconstruct a database, additionally you can react on events by processing them in an external system.

Kafka connect can be deployed in standalone mode or distributed as a cluster of workers.

It features a RESTful interface for interacting with it:

  • configuring connectors
  • starting, stopping, pausing connectors
  • viewing connector status
  • resting connector offsets

It also allows you to apply various transformations on a message.

Apache Kafka has an amazing documentation section on Kafka Connect.

Rest API

For reference, I’ve copied all the operations from the REST API documentation and put them into a table.

Method Path Description  
GET /connectors return a list of active connectors.  
POST /connectors create a new connector.  
GET /connectors/{name} get information about a specific connector.  
DELETE /connectors/{name} deletes a connector.  
GET /connectors/{name}/config get the configuration parameters for a specific connector.  
PUT /connectors/{name}/config update the configuration parameters for a specific connector.  
PATCH /connectors/{name}/config patch the configuration parameters for a specific connector.  
GET /connectors/{name}/status get current status of the connector.  
GET /connectors/{name}/tasks get a list of tasks currently running for a connector.  
GET /connectors/{name}/tasks/{taskid}/status get current status of the task.  
PUT /connectors/{name}/pause pause the connector and its tasks, which stops message processing until the connector is resumed.  
PUT /connectors/{name}/stop stop the connector and shut down its tasks.  
PUT /connectors/{name}/resume resume a paused or stopped connector.  
POST /connectors/{name}/restart restart a connector and its tasks instances.  
POST /connectors/{name}/tasks/{taskId}/restart restart an individual task.  
PUT /connectors/{name}/topics/reset send a request to empty the set of active topics of a connector.  
GET /connectors/{name}/offsets get the current offsets for a connector.  
DELETE /connectors/{name}/offsets reset the offsets for a connector.  
GET /connector-plugins return a list of connector plugins installed in the Kafka Connect cluster.  
GET /connector-plugins/{plugin-type}/config get the configuration definition for the specified plugin.  
PUT /connector-plugins/{connector-type}/config/validate validate the provided configuration values against the configuration definition.  

To start a new connector instance you would usually use POST on /connectors with a config body:

{
  "name": "my-jdbc-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",

    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "myuser",
    "connection.password": "mypassword",

    "mode": "incrementing",
    "incrementing.column.name": "id",
    "table.whitelist": "users",
    "poll.interval.ms": "5000",

    "topic.prefix": "pg.",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "transforms": "maskSensitive",
    "transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
    "transforms.maskSensitive.fields": "email,phone",
    "transforms.maskSensitive.replacement": "****"
  }
}

Converters

Converters are used by connect in order to convert values from a type to another. Converts apply to the kafka message key and kafka message value. For example, if you have the following JSON message:

{"data": 1}

A string converter will put that message as a string in the Kafka topic, where as a JSON converter will keep it JSON. There are also binary format converters like Avro and ProtoBuf, that help reduce the message size by packing the message into the compact format. A downside of this format is that you need the message schema in order to deserialize it.

You can also write your own converter and load it into Kafka connect.

To set the converters you use the following keys:

  • key.converter: Sets the converter for the message key.
  • value.converter: Sets the converter for the message value.

Here are some common converter classes:

  • org.apache.kafka.connect.storage.StringConverter
  • org.apache.kafka.connect.json.JsonConverter
  • org.apache.kafka.connect.converters.ByteArrayConverter
  • io.confluent.connect.json.JsonSchemaConverter (Requires schema registry)
  • io.confluent.connect.protobuf.ProtobufConverter (Requires schema registry)
  • io.confluent.connect.avro.AvroConverter (Requires schema registry)

And you usually set a converter with:

{
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "true",
  "key.converter.schemas.enable": "true"
}

By also setting value.converter.schemas.enable to true you will receive the schema of the JSON message along with the payload.

Schema Registry

The schema registry is another component that acts as a cache for the message schemas.

Binary formats like Avro or Protobuf cannot be decoded by their receiver without the message’s schema, and sending the schema with each message increases the message size.

The purpose of the schema registry is to keep all schemas together in a database and let producers and consumers request the schema only when needed, so that messages can be produced in the kafka topic without including the schema.

This component is optional, and it’s only required when using binary formats like Avro or Protobuf.

Transforms

You can apply various transformations on messages that are processed by the connector.

Common transforms include masking fields, dropping fields, replacing values, renaming fields and more.

  • Cast - Cast fields or the entire key or value to a specific type
  • DropHeaders - Remove headers by name
  • ExtractField - Extract a specific field from Struct and Map and include only this field in results
  • Filter - Removes messages from all further processing. This is used with a predicate to selectively filter certain messages
  • Flatten - Flatten a nested data structure
  • HeaderFrom - Copy or move fields in the key or value to the record headers
  • HoistField - Wrap the entire event as a single field inside a Struct or a Map
  • InsertField - Add a field using either static data or record metadata
  • InsertHeader - Add a header using static data
  • MaskField - Replace field with valid null value for the type (0, empty string, etc) or custom replacement (non-empty string or numeric value only)
  • RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression
  • ReplaceField - Filter or rename fields
  • SetSchemaMetadata - modify the schema name or version
  • TimestampConverter - Convert timestamps between different formats
  • TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps
  • ValueToKey - Replace the record key with a new key formed from a subset of fields in the record value

Source: https://kafka.apache.org/41/kafka-connect/user-guide/#transformations

To apply transforms you would include them into the connector config:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "transforms": "maskSensitive",
  "transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskSensitive.fields": "sensitiveField",
  "transforms.maskSensitive.replacement": "****"
}

Docker Compose

You can start a pre-configured Kafka Connect instance along with a Kafka cluster for development or playing around using this docker-compose file.

services:

  broker:
    image: confluentinc/cp-kafka:8.0.0
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

#  schema-registry:
#    image: confluentinc/cp-schema-registry:8.0.0
#    hostname: schema-registry
#    container_name: schema-registry
#    depends_on:
#      - broker
#    ports:
#      - "8081:8081"
#    environment:
#      SCHEMA_REGISTRY_HOST_NAME: schema-registry
#      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
#      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: confluentinc/cp-kafka-connect:8.1.2
    hostname: connect
    container_name: connect
    depends_on:
      - broker
#      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
#      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"

Additionally, you can also download the kafka binary archive and run connect with bin/connect-standalone.sh config/connect-standalone.properties.

That’s all, I hope this article gave you a rough idea of Kafka Connect and it’s capabilities.

References

  • https://kafka.apache.org/41/kafka-connect/user-guide/
  • https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/