Search code examples
spring-bootapache-kafkadocker-compose

Springboot Kafka Consumer unable to maintain connect to kafka cluster brokers


I deployed the a 3 node kafka cluster using the following config in docker-compose

Kafka cluster:

services:

  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
    environment:
      CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka1-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9092"
    environment:
      CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://localhost:9093'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka2 -Dcom.sun.management.jmxremote.rmi.port=9998'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka2:29092,CONTROLLER://kafka2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka2-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9092"
    environment:
      CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://localhost:9094'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka3 -Dcom.sun.management.jmxremote.rmi.port=9999'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka3:29092,CONTROLLER://kafka3:29093,PLAINTEXT_HOST://0.0.0.0:9094'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka3-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

My spring boot application is unable to maintain connection and produces the following logs

2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Group coordinator localhost:9093 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Group coordinator localhost:9093 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Requesting disconnect from last known coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00  [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Requesting disconnect from last known coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.547-06:00  [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)

This is my kafka properties in my app.

kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    listener:
      ack-mode: manual
    properties:
      schema.registry.url: http://localhost:8185/
    consumer:
      group-id: transaction-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
      transaction-id-prefix: tx- 
      properties:
        enable-idempotence: true

What is causing the connection drop? This issue disappears when I depoly a one node kafka cluster.


Solution

  • In your Compose port mappings, you have 9093:9092, for example, but there's no listener configured for 9092 within the container

    It disappears for a one node cluster since the ports are all correct.