I have a kafka producer that lost communication to the broker temporarily. Each topic is defined with 10 partitions. When a connection is lost, I see the following log in the process running the producer:
%3|1690891270.524|FAIL|rdkafka#producer-1| [thrd:my_ip:9092/bootstrap]: my_ip:9092/bootstrap: Connect to ipv4#my_ip:9092 failed: Connection refused (after 0ms in state CONNECT, 10 identical error(s) suppressed)
%3|1690891273.524|FAIL|rdkafka#producer-1| [thrd:my_ip:9093/bootstrap]: my_ip:9093/bootstrap: Connect to ipv4#my_ip:9093 failed: Connection refused (after 0ms in state CONNECT, 7 identical error(s) suppressed)
%3|1690891273.584|FAIL|rdkafka#producer-1| [thrd:hostname:9094/1001]: hostname:9094/1001: Connect to ipv4#my_ip:9094 failed: Connection refused (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%4|1690891277.639|CLUSTERID|rdkafka#producer-1| [thrd:main]: Broker my_ip:9092/bootstrap reports different ClusterId "TkRgxModQH-mlCkT9mr3lQ" than previously known "o4j9GSQrQ5Svuwvq4uiWQQ": a client must not be simultaneously connected to multiple clusters
%5|1690891278.529|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic my_topic partition count changed from 10 to 1
my producer configuration is as basic as it gets:
config := kafka.ConfigMap{
"security.protocol": "plaintext",
"bootstrap.servers": my_ip,
}
producer, err := kafka.NewProducer(&config)
My docker compose with kafka:
zoo1:
image: "${IMAGE_ZOOKEEPER}"
hostname: zoo1
container_name: zoo1
ports:
- "127.0.0.1:2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
networks:
- internal_network
zoo2:
image: "${IMAGE_ZOOKEEPER}"
hostname: zoo2
container_name: zoo2
ports:
- "127.0.0.1:2182:2182"
environment:
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
networks:
- internal_network
zoo3:
image: "${IMAGE_ZOOKEEPER}"
hostname: zoo3
container_name: zoo3
ports:
- "127.0.0.1:2183:2183"
environment:
ZOOKEEPER_CLIENT_PORT: 2183
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
networks:
- internal_network
kafka1:
image: "${IMAGE_KAFKA}"
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:29092,EXTERNAL://niro:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
depends_on:
- zoo1
- zoo2
- zoo3
networks:
- internal_network
kafka2:
image: "${IMAGE_KAFKA}"
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:29093,EXTERNAL://niro:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
depends_on:
- zoo1
- zoo2
- zoo3
networks:
- internal_network
kafka3:
image: "${IMAGE_KAFKA}"
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29094,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:29094,EXTERNAL://niro:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
depends_on:
- zoo1
- zoo2
- zoo3
networks:
- internal_network
kafka-topic-create:
image: "${IMAGE_KAFKA}"
hostname: kafka-topic-create
container_name: kafka-topic-create
depends_on:
- kafka1
- kafka2
- kafka3
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka1:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka1:29092 --create --if-not-exists --topic my_topic --replication-factor 1 --partitions 10
echo -e 'Successfully created the following topics for Kafka1:'
kafka-topics --bootstrap-server kafka1:29092 --list
echo -e 'Kafka2:'
kafka-topics --bootstrap-server kafka2:29093 --list
echo -e 'Kafka3:'
kafka-topics --bootstrap-server kafka3:29094 --list
"
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
networks:
- internal_network
Where the images are confluentinc/cp-kafka:7.3.2
, confluentinc/cp-zookeeper:7.3.2
Why is kafka changing the partitions on its own? How do I prevent this? thank you
First, set env-var on each broker
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
This should solve your my_topic partition count changed from 10 to 1
.
The reason it changes to one is that your broker is apparently restarting, and that is the default for auto-created topics (your previous topic had been deleted)
If you want to find out why the topic had been deleted, or why Kafka is not connecting / restarting, you need to debug the broker logs, not your Golang app.
You also don't need three brokers here. Especially considering that your topic has 1 replica... One broker can also host topics with 10 partitions.
Maybe your brokers are failing because your machine is running out of memory, for example. Docker for Windows/Mac defaults to start with only 2G of RAM, I think, but each broker starts with at least 1G heap space needed, and then there's also Zookeeper