KRaft Kafka Cluster with Docker

Setting up a Kafka cluster in KRaft mode using a minimal Docker configuration and integrating a schema registry with the cluster.

Razvan Badescu
Level Up Coding

--

In this article, I will guide you through setting up a Kafka cluster in KRaft mode using a minimal Docker configuration. Additionally, we will explore the integration of the Schema Registry with our cluster. If you’re new to KRaft and want to gain a comprehensive understanding of its architecture, I highly recommend reading my in-depth article on the subject: The Next Generation Kafka Architecture.

Docker Setup

The following setup is a mixed Kafka cluster operating in KRaft mode with schema-registry as a client.

version: '3'
services:
kafka1:
image: confluentinc/cp-kafka
container_name: kafka1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka1:29092,EXTERNAL://localhost:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

kafka2:
image: confluentinc/cp-kafka
container_name: kafka2
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka2:29092,CONTROLLER://kafka2:29093,EXTERNAL://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka2:29092,EXTERNAL://localhost:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

kafka3:
image: confluentinc/cp-kafka
container_name: kafka3
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka3:29092,CONTROLLER://kafka3:29093,EXTERNAL://0.0.0.0:9094'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka3:29092,EXTERNAL://localhost:9094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

schema-registry:
image: confluentinc/cp-schema-registry
container_name: schema-registry
hostname: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
depends_on:
- kafka1
- kafka2
- kafka3

networks:
default:
name: my-network
external: true

Before running the setup, create the network “my-network” using the docker network create command, and generate the “CLUSTER_ID”.

For adding more components to the setup, I recommend exploring Confluent’s cp-all-in-one-kraft setup.

Kafka Node Configuration

The Kafka nodes are configured with the required properties for KRaft:

  • KAFKA_NODE_ID — The 32-bit ID uniquely identifying the Kafka node.
  • KAFKA_CONTROLLER_LISTENER_NAMES — Comma-separated names used by the controller. In KRaft mode it must be set because the default is the PLAINTEXT protocol.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP — Maps listener names to the corresponding security protocols. We assigned three listeners (CONTROLLER, INTERNAL, and EXTERNAL) to the un-authenticated and non-encrypted PLAINTEXT security protocol.
  • KAFKA_LISTENERS — The address on which the server listens. CONTROLLER is for controller quorum communication, INTERNAL is for internal communication between DATA PLANE brokers (our mixed nodes belong to both DATA PLANE and CONTROL PLANE), as well as for clients on the same network as the brokers (such as Schema Registry). EXTERNAL is for communication with clients outside the broker’s network, such as an application running on your computer connecting to the Kafka cluster running in Docker.
  • KAFKA_ADVERTISED_LISTENERS — The address advertised to the clients. If not set, it defaults to the KAFKA_LISTENERS.
  • KAFKA_CONTROLLER_QUORUM_VOTERS — Comma-separated list of controller voters, using the format {controller-id}@{controller-host):{controller-port}. Controllers in the quorum use port 29093.
  • KAFKA_PROCESS_ROLES — Specifies the role of the node: broker, controller, or both (in the case of mixed nodes).
  • CLUSTER_ID — A unique 16-byte base64 UUID for the cluster. Can be generated online, or with or with bin/kafka-storage.sh random-uuid.
  • KAFKA_LOG_DIRS — Comma-separated list of directories to store the log files.
  • KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS — The delay used by the GroupCoordinator to start the initial rebalance, when the first member joins an empty group.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR — Replication factor for the __kafka_offsets , the topic storing committed offsets. Default is 1 but we adjusted to 3 in our 3-brokers cluster.
  • KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR — Replication factor for the __transaction_state, the topic that stores the transaction state, and is automatically created at the first request to transactional API.
  • The replication factor is the number of nodes, including the leader to which data is replicated.
  • The host’s ports 9092, 9093, and 9094 are mapped to the container’s port 9092, enabling Kafka clients outside the broker’s network to connect to the cluster.

Additional KRaft properties for each Kafka node can be configured using the server.properties file. When setting these properties in Docker Compose, remember to use the KAFKA_” prefix.

Schema Registry Configuration

The schema registry configuration includes the following properties:

  • SCHEMA_REGISTRY_HOST_NAME — Advertised host name, required for intercommunication between multiple instances of Schema Registry.
  • SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS — List of Kafka brokers to connect to.
  • SCHEMA_REGISTRY_LISTENERS — Comma-separated list of listeners for API requests supporting HTTP or HTTPS. Defaults to http://0.0.0.0.8081.

For additional Schema Registry configuration properties, refer to Confluent’s Schema Registry Configuration.

Running the setup

As we execute the “docker-compose up -d” command, the cluster springs to life, operating in KRaft mode:

Running in KRaft mode...
===> Running preflight checks ...
===> Check if /var/lib/kafka/data is writable ...
===> Running in KRaft mode, skipping Zookeeper health check...
===> Using provided cluster id ciWo7IWazngRchmPES6q5A== ...
===> Launching ...
===> Launching kafka ...

A Follower logs

Inspecting the logs of the kafka3 node, we observe the initiation of the Leader Election Process:

INFO [Controller 3] In the new epoch 1, the leader is (none). (org.apache.kafka.controller.QuorumController)

The kafka3 node casts its vote, designating the node with ID 2 as the leader:

INFO [RaftManager nodeId=3] Completed transition to Voted(epoch=1, votedId=2, voters=[1, 2, 3], electionTimeoutMs=1358) (org.apache.kafka.raft.QuorumState)
INFO [RaftManager nodeId=3] Vote request VoteRequestData(clusterId='ciWo7IWazngRchmPES6q5A', topics=[TopicData(topicName='__cluster_metadata', partitions=[PartitionData(partitionIndex=0, candidateEpoch=1, candidateId=2, lastOffsetEpoch=0, lastOffset=0)])]) with epoch 1 is granted (org.apache.kafka.raft.KafkaRaftClient)

As the kafka3 casts its vote in favor of the node with ID 2 (kafka2), it transitions from a candidate to a follower:

INFO [RaftManager nodeId=3] Completed transition to FollowerState(fetchTimeoutMs=2000, epoch=1, leaderId=2, voters=[1, 2, 3], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) (org.apache.kafka.raft.QuorumState)
INFO [Controller 3] In the new epoch 1, the leader is 2. (org.apache.kafka.controller.QuorumController)
INFO [RaftManager nodeId=3] High watermark set to Optional[LogOffsetMetadata(offset=4, metadata=Optional.empty)] for the first time for epoch 1 (org.apache.kafka.raft.FollowerState)
or epoch 1 (org.apache.kafka.raft.FollowerState)
INFO [MetadataLoader 3] initializeNewPublishers: The loader is still catching up because we have loaded up to offset -1, but the high water mark is 4 (org.apache.kafka.image.loader.MetadataLoader)
INFO [MetadataLoader 3] handleSnapshot: generated a metadata delta from a snapshot at offset 3 in 28441 us. (org.apache.kafka.image.loader.MetadataLoader)
INFO [MetadataLoader 3] handleCommit: The loader finished catching up to the current high water mark of 4 (org.apache.kafka.image.loader.MetadataLoader)

A Leader logs

INFO [RaftManager nodeId=3] Completed transition to Leader(localId=3, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=false), 2=ReplicaState(nodeId=2, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=false), 3=ReplicaState(nodeId=3, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState)
INFO [Controller 3] Becoming the active controller at epoch 1, committed offset -1, committed epoch -1 (org.apache.kafka.controller.QuorumController)

After starting the cluster using docker-compose, the log directories will be created under the specified directory (/tmp/kraft-combined-logs) as configured.

The configured logs directory
Configured Logs Directory
Collapsed __cluster_metadata-0 directory

In conclusion, this article has presented a streamlined Docker setup for deploying a Kafka cluster in KRaft mode and integrating a Schema Registry. I encourage you to refer to my previous article on KRaft architecture for a deeper understanding of the concepts and processes discussed here. By leveraging the insights from both articles, you’ll gain the expertise needed to confidently implement KRaft-based Kafka clusters in your technical projects.

Further Reading:

Level Up Coding

Thanks for being a part of our community! Before you go:

🚀👉 Join the Level Up talent collective and find an amazing job

--

--