Kafka Connect Basics
Hi, this article is about Kafka connect!
Introduction
Kafka connect is a tool for streaming data between Kafka and other systems. It is distributed and scalable by default and since it’s a standardized tool there are lots of connectors already available.
Connectors connect Kafka to a system or vice versa. There are two types of connectors
- Source: Source connectors grab data from an existing system e.g: MariaDB, PostgreSQL, S3, Jira, and others, and stream the data into one or more Kafka topics.
- Sink: Sink connectors grab the data from the topics and ingests it to a new system, eg: MongoDB, Snowflake, S3.
If you want to stream change data capture events from your databases, the Debezium provides connectors that allow you to do just that. CDC is an append only log that identifies changes in databases, using a cdc stream you can replicate or reconstruct a database, additionally you can react on events by processing them in an external system.
Kafka connect can be deployed in standalone mode or distributed as a cluster of workers.
It features a RESTful interface for interacting with it:
- configuring connectors
- starting, stopping, pausing connectors
- viewing connector status
- resting connector offsets
It also allows you to apply various transformations on a message.
Apache Kafka has an amazing documentation section on Kafka Connect.
Rest API
For reference, I’ve copied all the operations from the REST API documentation and put them into a table.
| Method | Path | Description | |
|---|---|---|---|
| GET | /connectors | return a list of active connectors. | |
| POST | /connectors | create a new connector. | |
| GET | /connectors/{name} | get information about a specific connector. | |
| DELETE | /connectors/{name} | deletes a connector. | |
| GET | /connectors/{name}/config | get the configuration parameters for a specific connector. | |
| PUT | /connectors/{name}/config | update the configuration parameters for a specific connector. | |
| PATCH | /connectors/{name}/config | patch the configuration parameters for a specific connector. | |
| GET | /connectors/{name}/status | get current status of the connector. | |
| GET | /connectors/{name}/tasks | get a list of tasks currently running for a connector. | |
| GET | /connectors/{name}/tasks/{taskid}/status | get current status of the task. | |
| PUT | /connectors/{name}/pause | pause the connector and its tasks, which stops message processing until the connector is resumed. | |
| PUT | /connectors/{name}/stop | stop the connector and shut down its tasks. | |
| PUT | /connectors/{name}/resume | resume a paused or stopped connector. | |
| POST | /connectors/{name}/restart | restart a connector and its tasks instances. | |
| POST | /connectors/{name}/tasks/{taskId}/restart | restart an individual task. | |
| PUT | /connectors/{name}/topics/reset | send a request to empty the set of active topics of a connector. | |
| GET | /connectors/{name}/offsets | get the current offsets for a connector. | |
| DELETE | /connectors/{name}/offsets | reset the offsets for a connector. | |
| GET | /connector-plugins | return a list of connector plugins installed in the Kafka Connect cluster. | |
| GET | /connector-plugins/{plugin-type}/config | get the configuration definition for the specified plugin. | |
| PUT | /connector-plugins/{connector-type}/config/validate | validate the provided configuration values against the configuration definition. |
To start a new connector instance you would usually use POST on /connectors with a config body:
{
"name": "my-jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "myuser",
"connection.password": "mypassword",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users",
"poll.interval.ms": "5000",
"topic.prefix": "pg.",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "maskSensitive",
"transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskSensitive.fields": "email,phone",
"transforms.maskSensitive.replacement": "****"
}
}
Converters
Converters are used by connect in order to convert values from a type to another. Converts apply to the kafka message key and kafka message value. For example, if you have the following JSON message:
{"data": 1}
A string converter will put that message as a string in the Kafka topic, where as a JSON converter will keep it JSON. There are also binary format converters like Avro and ProtoBuf, that help reduce the message size by packing the message into the compact format. A downside of this format is that you need the message schema in order to deserialize it.
You can also write your own converter and load it into Kafka connect.
To set the converters you use the following keys:
- key.converter: Sets the converter for the message key.
- value.converter: Sets the converter for the message value.
Here are some common converter classes:
- org.apache.kafka.connect.storage.StringConverter
- org.apache.kafka.connect.json.JsonConverter
- org.apache.kafka.connect.converters.ByteArrayConverter
- io.confluent.connect.json.JsonSchemaConverter (Requires schema registry)
- io.confluent.connect.protobuf.ProtobufConverter (Requires schema registry)
- io.confluent.connect.avro.AvroConverter (Requires schema registry)
And you usually set a converter with:
{
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "true"
}
By also setting value.converter.schemas.enable to true you will receive the schema of the JSON message along
with the payload.
Schema Registry
The schema registry is another component that acts as a cache for the message schemas.
Binary formats like Avro or Protobuf cannot be decoded by their receiver without the message’s schema, and sending the schema with each message increases the message size.
The purpose of the schema registry is to keep all schemas together in a database and let producers and consumers request the schema only when needed, so that messages can be produced in the kafka topic without including the schema.
This component is optional, and it’s only required when using binary formats like Avro or Protobuf.
Transforms
You can apply various transformations on messages that are processed by the connector.
Common transforms include masking fields, dropping fields, replacing values, renaming fields and more.
- Cast - Cast fields or the entire key or value to a specific type
- DropHeaders - Remove headers by name
- ExtractField - Extract a specific field from Struct and Map and include only this field in results
- Filter - Removes messages from all further processing. This is used with a predicate to selectively filter certain messages
- Flatten - Flatten a nested data structure
- HeaderFrom - Copy or move fields in the key or value to the record headers
- HoistField - Wrap the entire event as a single field inside a Struct or a Map
- InsertField - Add a field using either static data or record metadata
- InsertHeader - Add a header using static data
- MaskField - Replace field with valid null value for the type (0, empty string, etc) or custom replacement (non-empty string or numeric value only)
- RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression
- ReplaceField - Filter or rename fields
- SetSchemaMetadata - modify the schema name or version
- TimestampConverter - Convert timestamps between different formats
- TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps
- ValueToKey - Replace the record key with a new key formed from a subset of fields in the record value
Source: https://kafka.apache.org/41/kafka-connect/user-guide/#transformations
To apply transforms you would include them into the connector config:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms": "maskSensitive",
"transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskSensitive.fields": "sensitiveField",
"transforms.maskSensitive.replacement": "****"
}
Docker Compose
You can start a pre-configured Kafka Connect instance along with a Kafka cluster for development or playing around using this docker-compose file.
services:
broker:
image: confluentinc/cp-kafka:8.0.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
# schema-registry:
# image: confluentinc/cp-schema-registry:8.0.0
# hostname: schema-registry
# container_name: schema-registry
# depends_on:
# - broker
# ports:
# - "8081:8081"
# environment:
# SCHEMA_REGISTRY_HOST_NAME: schema-registry
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: confluentinc/cp-kafka-connect:8.1.2
hostname: connect
container_name: connect
depends_on:
- broker
# - schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
Additionally, you can also download the kafka binary archive and run
connect with bin/connect-standalone.sh config/connect-standalone.properties.
That’s all, I hope this article gave you a rough idea of Kafka Connect and it’s capabilities.
References
- https://kafka.apache.org/41/kafka-connect/user-guide/
- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/