Kafka Kraft Cluster with Debezium Connector, Schema Registry, Kafka UI Configuration
2022-01-01If you're looking to set up a robust Kafka cluster with Kafka Connect and Schema Registry for your data streaming needs, you're in the right place. In this article, we'll walk you through a Docker Compose configuration for Kafka 3.5, designed to provide a multi-broker Kafka cluster that's ready to handle your data pipeline requirements. We'll also explore the custom configurations used for Kafka Connect and Schema Registry.
Note: In below configuration, we don't need Zookeper. Below configuration uses Apache Kafka Raft (KRaft) instead of Zookeper.
Note: We've chosen not to use Apicurio Schema Registry due to specific requirements involving Kafka Foreign Key join auto-topic creation.
Note: At least, we need minimum 8GB RAM for this configuration, so change RAM capaticiy of Docker desktop app.
Prerequisites
Before diving into the configuration, make sure you have Docker and Docker Compose installed on your system.
The Docker Compose Configuration
Here's a Docker Compose file for setting up a Kafka 3.4 cluster with three broker nodes, Kafka Connect, Schema Registry, ksqlDB, and a Kafka UI. Let's break down the key components and their configurations:
Kafka Broker Nodes
We configure three Kafka broker nodes: kafka-0, kafka-1, and kafka-2. Each broker is based on the Bitnami Kafka image and exposes port 9092, 9093, and 9094, respectively.
Note: As the default debezium docker image isn't bundled with confluent converter classes such as AvroConverter class, we have used custom debezium connector which has base debezium image.
version: "3.2"
services:
kafka-0:
image: docker.io/bitnami/kafka:3.5.1
ports:
- "9092:9092"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093
- KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:29092,EXTERNAL://localhost:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_JMX_PORT=19101
volumes:
- kafka_0_data:/bitnami/kafka
kafka-1:
image: docker.io/bitnami/kafka:3.5.1
ports:
- "9093:9093"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-1:29092,EXTERNAL://localhost:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_JMX_PORT=19101
volumes:
- kafka_1_data:/bitnami/kafka
kafka-2:
image: docker.io/bitnami/kafka:3.5.1
ports:
- "9094:9094"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093
- KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-2:29092,EXTERNAL://localhost:9094
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_JMX_PORT=19101
volumes:
- kafka_2_data:/bitnami/kafka
schema-registry:
image: docker.io/bitnami/schema-registry:7.3
ports:
- "8081:8081"
depends_on:
- kafka-0
- kafka-1
- kafka-2
environment:
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka-0:29092,PLAINTEXT://kafka-1:29092,PLAINTEXT://kafka-2:29092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
connect:
image: olyanren/debezium-confluent:2.3.1
hostname: connect
container_name: connect
depends_on:
- kafka-0
- kafka-1
- kafka-2
- schema-registry
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092"
ADVERTISED_PORT: 8083
ADVERTISED_HOST_NAME: "kafka-connect"
GROUP_ID: compose-connect-group
CONFIG_STORAGE_TOPIC: docker-connect-configs
OFFSET_STORAGE_TOPIC: docker-connect-offsets
STATUS_STORAGE_TOPIC: docker-connect-status
ENABLE_SCHEMA_CONVERTERS: "true"
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_AUTO-REGISTER: "true"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_FIND-LATEST: "true"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_AUTO-REGISTER: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_FIND-LATEST: "true"
CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: "avro"
LOG4J_ROOT_LOGLEVEL: "INFO"
LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONFIG_STORAGE_REPLICATION_FACTOR: "3"
OFFSET_STORAGE_REPLICATION_FACTOR: "3"
STATUS_STORAGE_REPLICATION_FACTOR: "3"
OFFSET_FLUSH_INTERVAL_MS: 60000
PLUGIN_PATH: '/usr/share/java'
ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SINK_REPLICAS: "3"
KSQL_KSQL_STREAMS_REPLICATION_FACTOR: "3"
KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: "1"
volumes:
- ksql_data:/bitnami/ksql
depends_on:
- kafka-0
- kafka-1
- kafka-2
- schema-registry
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
DYNAMIC_CONFIG_ENABLED : "true"
KAFKA_CLUSTERS_0_NAME: Dpi4jdiOTyW0G3vV112ZWg
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092"
KAFKA_CLUSTERS_0_METRICS_PORT: 19101
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME: admin
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD: admin
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083
restart: always
depends_on:
- kafka-0
- kafka-1
- kafka-2
- schema-registry
- connect
volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local
ksql_data:
driver: local
Custom Debezium Connector for SQL Server
To work with SQL Server, we've used a custom Debezium connector. You can register it using a POST request to Kafka Connect's REST API:
URL: http://localhost:8083/connectors
Method: POST
Content-Type: application/json
{
"name": "hitit-basit-belge-isletme-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"topic.prefix": "hitit.isletme.tis",
"database.hostname": "localhost",
"database.port": "1433",
"database.user": "ktbHitit",
"database.password": "1234",
"database.names": "demo",
"schema.history.internal.kafka.bootstrap.servers": "kafka-0:29092,kafka-1:29092,kafka-2:29092",
"schema.history.internal.kafka.topic": "schema-changes.demo-internal",
"database.encrypt": "false",
"snapshot.mode": "initial",
"snapshot.isolation.mode": "repeatable_read",
"table.include.list": "dbo.users, dbo.roles, dbo.permissions",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.auto-register": true,
"key.converter.schema.registry.find-latest": true,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.auto-register": true,
"value.converter.schema.registry.find-latest": true,
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.name.adjustment.mode": "avro",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq_hitit_isletme_tis",
"errors.deadletterqueue.topic.replication.factor": 1,
"transforms": "unwrap,createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "Id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "Id",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.partitions": "3",
"topic.creation.default.replication.factor": "3"
}
}