Kafka Kraft Cluster with Debezium Connector, Schema Registry, Kafka UI Configuration
2022-01-01If you're looking to set up a robust Kafka cluster with Kafka Connect and Schema Registry for your data streaming needs, you're in the right place. In this article, we'll walk you through a Docker Compose configuration for Kafka 3.5, designed to provide a multi-broker Kafka cluster that's ready to handle your data pipeline requirements. We'll also explore the custom configurations used for Kafka Connect and Schema Registry.
Note: In below configuration, we don't need Zookeper. Below configuration uses Apache Kafka Raft (KRaft) instead of Zookeper.
Note: We've chosen not to use Apicurio Schema Registry due to specific requirements involving Kafka Foreign Key join auto-topic creation.
Note: At least, we need minimum 8GB RAM for this configuration, so change RAM capaticiy of Docker desktop app.
Prerequisites
Before diving into the configuration, make sure you have Docker and Docker Compose installed on your system.
The Docker Compose Configuration
Here's a Docker Compose file for setting up a Kafka 3.4 cluster with three broker nodes, Kafka Connect, Schema Registry, ksqlDB, and a Kafka UI. Let's break down the key components and their configurations:
Kafka Broker Nodes
We configure three Kafka broker nodes: kafka-0, kafka-1, and kafka-2. Each broker is based on the Bitnami Kafka image and exposes port 9092, 9093, and 9094, respectively.
Note: As the default debezium docker image isn't bundled with confluent converter classes such as AvroConverter class, we have used custom debezium connector which has base debezium image.
version: "3.2" services: kafka-0: image: docker.io/bitnami/kafka:3.5.1 ports: - "9092:9092" environment: - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093 - KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:29092,EXTERNAL://localhost:9092 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_JMX_PORT=19101 volumes: - kafka_0_data:/bitnami/kafka kafka-1: image: docker.io/bitnami/kafka:3.5.1 ports: - "9093:9093" environment: - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-1:29092,EXTERNAL://localhost:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_JMX_PORT=19101 volumes: - kafka_1_data:/bitnami/kafka kafka-2: image: docker.io/bitnami/kafka:3.5.1 ports: - "9094:9094" environment: - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_NODE_ID=2 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093 - KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-2:29092,EXTERNAL://localhost:9094 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_JMX_PORT=19101 volumes: - kafka_2_data:/bitnami/kafka schema-registry: image: docker.io/bitnami/schema-registry:7.3 ports: - "8081:8081" depends_on: - kafka-0 - kafka-1 - kafka-2 environment: - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 - SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka-0:29092,PLAINTEXT://kafka-1:29092,PLAINTEXT://kafka-2:29092 - SCHEMA_REGISTRY_HOST_NAME=schema-registry connect: image: olyanren/debezium-confluent:2.3.1 hostname: connect container_name: connect depends_on: - kafka-0 - kafka-1 - kafka-2 - schema-registry ports: - "8083:8083" environment: BOOTSTRAP_SERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092" ADVERTISED_PORT: 8083 ADVERTISED_HOST_NAME: "kafka-connect" GROUP_ID: compose-connect-group CONFIG_STORAGE_TOPIC: docker-connect-configs OFFSET_STORAGE_TOPIC: docker-connect-offsets STATUS_STORAGE_TOPIC: docker-connect-status ENABLE_SCHEMA_CONVERTERS: "true" KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_AUTO-REGISTER: "true" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_FIND-LATEST: "true" CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_AUTO-REGISTER: "true" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_FIND-LATEST: "true" CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: "avro" LOG4J_ROOT_LOGLEVEL: "INFO" LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" CONFIG_STORAGE_REPLICATION_FACTOR: "3" OFFSET_STORAGE_REPLICATION_FACTOR: "3" STATUS_STORAGE_REPLICATION_FACTOR: "3" OFFSET_FLUSH_INTERVAL_MS: 60000 PLUGIN_PATH: '/usr/share/java' ksqldb-server: image: confluentinc/ksqldb-server:0.29.0 hostname: ksqldb-server container_name: ksqldb-server ports: - "8088:8088" environment: KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_BOOTSTRAP_SERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092" KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" KSQL_KSQL_SINK_REPLICAS: "3" KSQL_KSQL_STREAMS_REPLICATION_FACTOR: "3" KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: "1" volumes: - ksql_data:/bitnami/ksql depends_on: - kafka-0 - kafka-1 - kafka-2 - schema-registry ksqldb-cli: image: confluentinc/ksqldb-cli:0.29.0 container_name: ksqldb-cli depends_on: - ksqldb-server entrypoint: /bin/sh tty: true kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui ports: - "8080:8080" environment: DYNAMIC_CONFIG_ENABLED : "true" KAFKA_CLUSTERS_0_NAME: Dpi4jdiOTyW0G3vV112ZWg KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092" KAFKA_CLUSTERS_0_METRICS_PORT: 19101 KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081 KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME: admin KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD: admin KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083 restart: always depends_on: - kafka-0 - kafka-1 - kafka-2 - schema-registry - connect volumes: kafka_0_data: driver: local kafka_1_data: driver: local kafka_2_data: driver: local ksql_data: driver: local
Custom Debezium Connector for SQL Server
To work with SQL Server, we've used a custom Debezium connector. You can register it using a POST request to Kafka Connect's REST API:
URL: http://localhost:8083/connectors
Method: POST
Content-Type: application/json
{ "name": "hitit-basit-belge-isletme-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "topic.prefix": "hitit.isletme.tis", "database.hostname": "localhost", "database.port": "1433", "database.user": "ktbHitit", "database.password": "1234", "database.names": "demo", "schema.history.internal.kafka.bootstrap.servers": "kafka-0:29092,kafka-1:29092,kafka-2:29092", "schema.history.internal.kafka.topic": "schema-changes.demo-internal", "database.encrypt": "false", "snapshot.mode": "initial", "snapshot.isolation.mode": "repeatable_read", "table.include.list": "dbo.users, dbo.roles, dbo.permissions", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "key.converter.schema.registry.auto-register": true, "key.converter.schema.registry.find-latest": true, "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "value.converter.schema.registry.auto-register": true, "value.converter.schema.registry.find-latest": true, "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy", "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter", "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter", "schema.name.adjustment.mode": "avro", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "errors.tolerance": "all", "errors.deadletterqueue.topic.name": "dlq_hitit_isletme_tis", "errors.deadletterqueue.topic.replication.factor": 1, "transforms": "unwrap,createKey,extractInt", "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields": "Id", "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field": "Id", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite", "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.partitions": "3", "topic.creation.default.replication.factor": "3" } }