Search code examples
apache-kafkadocker-composekraft

Kafka KRaft mode with docker-compose


I am trying to run kafka in KRaft mode, but keep running into errors. Here is my best attempt so far (I used GPT to generate this as I failed to find the relevant parts of the documentation):

docker-compose.yml

services:
  # Kafka controller: manages the overall state of the Kafka cluster
  kafka-controller-0:
    image: apache/kafka:3.7.1
    container_name: kafka-controller-0
    environment:
      KAFKA_BROKER_ID: 0
      # Listener for the controller and broker
      KAFKA_CONTROLLER_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9090
      # Port for Raft communication (consensus)
      KAFKA_RAFT_PORT: 9091
      # Directory for Raft and Kafka data
      KAFKA_RAFT_DATA_DIR: /var/lib/kafka/data
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ./data/kafka-controller-0:/var/lib/kafka/data
    ports:
      # expose broker port
      - "9092:9092"
    networks:
      - kafka-net

  # Kafka broker: stores and manages partitions of topics
  kafka-broker-0:
    image: apache/kafka:3.7.1
    container_name: kafka-broker-0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_CONTROLLER_LISTENERS: PLAINTEXT://:9092
      KAFKA_RAFT_PORT: 9091
      KAFKA_RAFT_DATA_DIR: /var/lib/kafka/data
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ./data/kafka-broker-0:/var/lib/kafka/data
    ports:
      # expose broker port
      - "9093:9092"
    networks:
      - kafka-net

  kafka-broker-1:
    image: apache/kafka:3.7.1
    container_name: kafka-broker-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_CONTROLLER_LISTENERS: PLAINTEXT://:9092
      KAFKA_RAFT_PORT: 9091
      KAFKA_RAFT_DATA_DIR: /var/lib/kafka/data
      KAFKA_LOG_DIRS: /var/lib/kafka/data
    volumes:
      - ./data/kafka-broker-1:/var/lib/kafka/data
    ports:
      - "9094:9092"
    networks:
      - kafka-net

networks:
  kafka-net:

Unfortunately, this gives this error:

Attaching to kafka-broker-0, kafka-broker-1, kafka-controller-0
kafka-controller-0  | ===> User
kafka-broker-0      | ===> User
kafka-broker-0      | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka-controller-0  | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka-broker-0      | ===> Setting default values of environment variables if not already set.
kafka-controller-0  | ===> Setting default values of environment variables if not already set.
kafka-controller-0  | CLUSTER_ID not set. Setting it to default value: "5L6g3nShT-eMCtK--X86sw"
kafka-broker-0      | CLUSTER_ID not set. Setting it to default value: "5L6g3nShT-eMCtK--X86sw"
kafka-controller-0  | ===> Configuring ...
kafka-broker-0      | ===> Configuring ...
kafka-controller-0  | ===> Launching ... 
kafka-broker-0      | ===> Launching ... 
kafka-broker-0      | ===> Using provided cluster id 5L6g3nShT-eMCtK--X86sw ...
kafka-controller-0  | ===> Using provided cluster id 5L6g3nShT-eMCtK--X86sw ...
kafka-broker-1      | ===> User
kafka-broker-1      | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka-broker-1      | ===> Setting default values of environment variables if not already set.
kafka-broker-1      | CLUSTER_ID not set. Setting it to default value: "5L6g3nShT-eMCtK--X86sw"
kafka-broker-1      | ===> Configuring ...
kafka-broker-1      | ===> Launching ... 
kafka-broker-1      | ===> Using provided cluster id 5L6g3nShT-eMCtK--X86sw ...
kafka-broker-0      | Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration `zookeeper.connect` which has no default value. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2299) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2290) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1638) at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:71) at scala.Option.flatMap(Option.scala:283) at kafka.tools.StorageTool$.execute(StorageTool.scala:71) at kafka.tools.StorageTool$.main(StorageTool.scala:52) at kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:47) at kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala)
kafka-controller-0  | Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration `zookeeper.connect` which has no default value. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2299) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2290) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1638) at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:71) at scala.Option.flatMap(Option.scala:283) at kafka.tools.StorageTool$.execute(StorageTool.scala:71) at kafka.tools.StorageTool$.main(StorageTool.scala:52) at kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:47) at kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala)
kafka-broker-1      | Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration `zookeeper.connect` which has no default value. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2299) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2290) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1638) at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:71) at scala.Option.flatMap(Option.scala:283) at kafka.tools.StorageTool$.execute(StorageTool.scala:71) at kafka.tools.StorageTool$.main(StorageTool.scala:52) at kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:47) at kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala)
kafka-controller-0 exited with code 1
kafka-broker-1 exited with code 1
kafka-broker-0 exited with code 1

Adding KAFKA_ZOOKEEPER_CONNECT: "" to all containers removes this error but induces a new one:

kafka-broker-0      | The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.
kafka-broker-1      | The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.
kafka-controller-0  | The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.

What is the correct way to achieve this, and is there any good documentation that helps me to proceed more systematically?


Solution

  • I am using Kafka to ingest data into #QuestDB and this compose is working for me. I define 2 different brokers/controllers, so I have some common definition at the top and then I include it on both brokers.

    (updated): I am also including my configuration for two kafka connect instances and a single schema registry. In a production environment you might want to have more than one schema registry for high availability, and you might want to separate controller and broker roles on the kafka brokers, but hopefully this helps you get started.

    Note that I am also including, for your reference, how to automatically register a kafka connect connector on startup. For this to work, you need to have the connector jar (or download from the confluent hub). In my case, I am mounting the local folder ./kafka-connect-plugins, where I have the jar file for the questdb-connect, and then referencing it on the config for the workers. As you see, registration of the connector needs to be done on only one of the kafka connects, and it will be automatically be registered for the rest of workers in the cluster.

    # Common kafka broker environment variables
    x-kafka-broker-env: &kafka-broker-env
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_ENABLE_KRAFT: yes
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093,2@broker-2:29093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      KAFKA_METADATA_LOG_DIR: /tmp/kraft-metadata-logs
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MTkwSDkzMDg5KTdFNDJCRU'
    
    
    # Common kafka broker configuration
    x-kafka-broker-common: &kafka-broker-common
      image: confluentinc/cp-kafka:7.7.0
      extra_hosts:
        - "host.docker.internal:host-gateway"
    
    
    # Common kafka connect environment variables
    x-kafka-connect-env: &kafka-connect-env
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092,broker-2:29092'
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 500
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 2
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 2
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 2
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.3-0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_ERRORS_RETRY_TIMEOUT: 90000
      CONNECT_ERRORS_RETRY_DELAY_MAX_MS: 120000
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components"
      QUESTDB_HTTP_ENDPOINT: "${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000}"
    
    
    # Common kafka connect configuration
    x-kafka-connect-common: &kafka-connect-common
      image: confluentinc/cp-kafka-connect:7.7.0
      depends_on:
        - broker-1
        - broker-2
      extra_hosts:
        - "host.docker.internal:host-gateway"
      volumes:
        - ./kafka-connect-plugins:/etc/kafka-connect/jars
    
    
    services:
      questdb:
        image: questdb/questdb:8.1.1
        container_name: rta_questdb
        restart: always
        ports:
          - "8812:8812"
          - "9000:9000"
          - "9009:9009"
          - "9003:9003"
        extra_hosts:
          - "host.docker.internal:host-gateway"
        environment:
          - QDB_METRICS_ENABLED=TRUE     
        volumes:
          - ./questdb/questdb_root:/var/lib/questdb/:rw
    
      broker-1:
        <<: *kafka-broker-common
        hostname: broker
        container_name: rta_kafka_broker
        ports:
          - "9092:9092"
          - "9101:9101"
        extra_hosts:
          - "host.docker.internal:host-gateway"
        volumes:
          - ./broker-1/kafka-data:/var/lib/kafka/data
          - ./broker-1/kafka-secrets:/etc/kafka/secrets
          - ./broker-1/tmp:/tmp
        environment:
          <<: *kafka-broker-env
          KAFKA_NODE_ID: 1
          KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
          KAFKA_JMX_PORT: 9101
          KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'     
    
      broker-2:
        <<: *kafka-broker-common
        hostname: broker-2
        container_name: rta_kafka_broker_2
        ports:
          - "9093:9093"
          - "9102:9102"      
        extra_hosts:
          - "host.docker.internal:host-gateway"
        volumes:     
          - ./broker-2/kafka-data:/var/lib/kafka/data
          - ./broker-2/kafka-secrets:/etc/kafka/secrets
          - ./broker-2/tmp:/tmp
        environment:
          <<: *kafka-broker-env
          KAFKA_NODE_ID: 2
          KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:29092,PLAINTEXT_HOST://localhost:9093'
          KAFKA_JMX_PORT: 9102
          KAFKA_LISTENERS: 'PLAINTEXT://broker-2:29092,CONTROLLER://broker-2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
          
    
      schema_registry:
        image: confluentinc/cp-schema-registry:7.7.0
        hostname: schema_registry
        container_name: rta_schema_registry
        depends_on:
          - broker-1
          - broker-2
        ports:
          - "8081:8081"
        extra_hosts:
          - "host.docker.internal:host-gateway"
        environment:
          SCHEMA_REGISTRY_HOST_NAME: schema_registry
          SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092,broker-2:29092'
          SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
          SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
    
      kafka-connect-1:
        <<: *kafka-connect-common
        hostname: kafka-connect
        container_name: rta_kafka_connect
        ports:
          - "8083:8083"
        environment:
          <<: *kafka-connect-env
          CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
          CONNECT_LISTENERS: http://0.0.0.0:8083
        command:
        - bash
        - -c
        - |
          # Launch Kafka Connect
          /etc/confluent/docker/run &
          #
          # Wait for Kafka Connect listener
          echo "Waiting for Kafka Connect to start listening on localhost ⏳"
          while : ; do
            curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
            echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
            if [ $$curl_status -eq 200 ] ; then
              break
            fi
            sleep 5
          done
    
          echo -e "\n--\n+> Registering QuestDB Connector"
    
          curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/questdb-trades/config -d '{
              "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
              "tasks.max": "5",
              "topics": "trades",
              "client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
              "name": "questdb-trades",
              "value.converter": "io.confluent.connect.avro.AvroConverter",
              "value.converter.schema.registry.url": "http://schema_registry:8081",
              "include.key": false,
              "key.converter": "io.confluent.connect.avro.AvroConverter",
              "key.converter.schema.registry.url": "http://schema_registry:8081",
              "table": "trades",
              "symbols": "symbol, side",
              "timestamp.field.name": "timestamp",
              "value.converter.schemas.enable": true
          }'
    
           sleep infinity
    
      kafka-connect-2:
        <<: *kafka-connect-common
        hostname: kafka-connect-2
        container_name: rta_kafka_connect_2
        ports:
          - "8084:8084"
        environment:
          <<: *kafka-connect-env
          CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect-2
          CONNECT_LISTENERS: http://0.0.0.0:8084