Search code examples
goapache-kafkakafka-producer-api

Kafka golang producer changing partition count after error


I have a kafka producer that lost communication to the broker temporarily. Each topic is defined with 10 partitions. When a connection is lost, I see the following log in the process running the producer:

%3|1690891270.524|FAIL|rdkafka#producer-1| [thrd:my_ip:9092/bootstrap]: my_ip:9092/bootstrap: Connect to ipv4#my_ip:9092 failed: Connection refused (after 0ms in state CONNECT, 10 identical error(s) suppressed)
%3|1690891273.524|FAIL|rdkafka#producer-1| [thrd:my_ip:9093/bootstrap]: my_ip:9093/bootstrap: Connect to ipv4#my_ip:9093 failed: Connection refused (after 0ms in state CONNECT, 7 identical error(s) suppressed)
%3|1690891273.584|FAIL|rdkafka#producer-1| [thrd:hostname:9094/1001]: hostname:9094/1001: Connect to ipv4#my_ip:9094 failed: Connection refused (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%4|1690891277.639|CLUSTERID|rdkafka#producer-1| [thrd:main]: Broker my_ip:9092/bootstrap reports different ClusterId "TkRgxModQH-mlCkT9mr3lQ" than previously known "o4j9GSQrQ5Svuwvq4uiWQQ": a client must not be simultaneously connected to multiple clusters
%5|1690891278.529|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic my_topic partition count changed from 10 to 1

my producer configuration is as basic as it gets:

    config := kafka.ConfigMap{
        "security.protocol": "plaintext",
        "bootstrap.servers": my_ip,
    }

    producer, err := kafka.NewProducer(&config)

My docker compose with kafka:

zoo1:
    image: "${IMAGE_ZOOKEEPER}"
    hostname: zoo1
    container_name: zoo1
    ports:
     - "127.0.0.1:2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    networks:
      - internal_network

  zoo2:
    image: "${IMAGE_ZOOKEEPER}"
    hostname: zoo2
    container_name: zoo2
    ports:
     - "127.0.0.1:2182:2182"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    networks:
      - internal_network

  zoo3:
    image: "${IMAGE_ZOOKEEPER}"
    hostname: zoo3
    container_name: zoo3
    ports:
     - "127.0.0.1:2183:2183"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888
    networks:
      - internal_network

  kafka1:
    image: "${IMAGE_KAFKA}"
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:29092,EXTERNAL://niro:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    networks:
      - internal_network

  kafka2:
    image: "${IMAGE_KAFKA}"
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:29093,EXTERNAL://niro:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    networks:
      - internal_network

  kafka3:
    image: "${IMAGE_KAFKA}"
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29094,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:29094,EXTERNAL://niro:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    networks:
      - internal_network

  kafka-topic-create:
    image: "${IMAGE_KAFKA}"
    hostname: kafka-topic-create
    container_name: kafka-topic-create
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka1:29092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka1:29092 --create --if-not-exists --topic my_topic --replication-factor 1 --partitions 10

      echo -e 'Successfully created the following topics for Kafka1:'
      kafka-topics --bootstrap-server kafka1:29092 --list

      echo -e 'Kafka2:'
      kafka-topics --bootstrap-server kafka2:29093 --list
      echo -e 'Kafka3:'

      kafka-topics --bootstrap-server kafka3:29094 --list


      "

    environment:
      KAFKA_BROKER_ID: ignored
      KAFKA_ZOOKEEPER_CONNECT: ignored
    networks:
      - internal_network

Where the images are confluentinc/cp-kafka:7.3.2, confluentinc/cp-zookeeper:7.3.2

Why is kafka changing the partitions on its own? How do I prevent this? thank you


Solution

  • First, set env-var on each broker

    KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
    

    This should solve your my_topic partition count changed from 10 to 1.

    The reason it changes to one is that your broker is apparently restarting, and that is the default for auto-created topics (your previous topic had been deleted)

    If you want to find out why the topic had been deleted, or why Kafka is not connecting / restarting, you need to debug the broker logs, not your Golang app.

    You also don't need three brokers here. Especially considering that your topic has 1 replica... One broker can also host topics with 10 partitions.


    Maybe your brokers are failing because your machine is running out of memory, for example. Docker for Windows/Mac defaults to start with only 2G of RAM, I think, but each broker starts with at least 1G heap space needed, and then there's also Zookeeper