I deployed the a 3 node kafka cluster using the following config in docker-compose
Kafka cluster:
services:
kafka1:
image: confluentinc/cp-kafka:latest
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
environment:
CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka1-data:/var/lib/kafka/data
- ./scripts/update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
kafka2:
image: confluentinc/cp-kafka:latest
hostname: kafka2
container_name: kafka2
ports:
- "9093:9092"
environment:
CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://localhost:9093'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka2 -Dcom.sun.management.jmxremote.rmi.port=9998'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka2:29092,CONTROLLER://kafka2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka2-data:/var/lib/kafka/data
- ./scripts/update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
kafka3:
image: confluentinc/cp-kafka:latest
hostname: kafka3
container_name: kafka3
ports:
- "9094:9092"
environment:
CLUSTER_ID: "7bes6xCzSMeUR-RRas3gMQ"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://localhost:9094'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 5000
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: '-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka3 -Dcom.sun.management.jmxremote.rmi.port=9999'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka3:29092,CONTROLLER://kafka3:29093,PLAINTEXT_HOST://0.0.0.0:9094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- kafka3-data:/var/lib/kafka/data
- ./scripts/update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
My spring boot application is unable to maintain connection and produces the following logs
2024-03-29T20:00:23.440-06:00 [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00 [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00 [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Group coordinator localhost:9093 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-03-29T20:00:23.440-06:00 [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Group coordinator localhost:9093 (id: 2147483645 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted.
2024-03-29T20:00:23.440-06:00 [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Requesting disconnect from last known coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.440-06:00 [Consumer clientId=consumer-transaction-group-1, groupId=transaction-group] Requesting disconnect from last known coordinator localhost:9093 (id: 2147483645 rack: null)
2024-03-29T20:00:23.547-06:00 [Consumer clientId=consumer-transaction-group-2, groupId=transaction-group] Discovered group coordinator localhost:9093 (id: 2147483645 rack: null)
This is my kafka properties in my app.
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
listener:
ack-mode: manual
properties:
schema.registry.url: http://localhost:8185/
consumer:
group-id: transaction-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
transaction-id-prefix: tx-
properties:
enable-idempotence: true
What is causing the connection drop? This issue disappears when I depoly a one node kafka cluster.
In your Compose port mappings, you have 9093:9092
, for example, but there's no listener
configured for 9092 within the container
It disappears for a one node cluster since the ports are all correct.