Search code examples
javaapache-kafkakafka-producer-api

Java docker kafka kraft simple connection


Hi I'm using docker to host a local kafka kraft server. I'm trying to make a simple connection using java

My docker docker-compose.yml looks like this

version: "3.5"
services:

  kafka-gen:
    image: confluentinc/cp-kafka:7.3.3
    hostname: kafka-gen
    container_name: kafka-gen
    volumes:
      - ./scripts/create_cluster_id.sh:/tmp/create_cluster_id.sh
      - ./clusterID:/tmp/clusterID
    command: "bash -c '/tmp/create_cluster_id.sh'"

  kafka1:
    image: confluentinc/cp-kafka:7.3.3
    hostname: kafka1
    container_name: kafka1
    ports:
      - "39092:39092"
    environment:
      KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093
      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092
      KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_PROCESS_ROLES: 'controller,broker'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093'
      KAFKA_METADATA_LOG_SEGMENT_MS: 15000
      KAFKA_METADATA_MAX_RETENTION_MS: 1200000
      KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
    volumes:
      - kafka1-data:/var/lib/kafka/data
      - ./scripts/update_run.sh:/tmp/update_run.sh
      - ./clusterID:/tmp/clusterID
    command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"

volumes:
  kafka1-data:

and my java code(using org.apache.kafka:kafka-clients:3.4.0)

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:39092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
        "first_topic", "testm"
);
kafkaProducer.send(producerRecord);

kafkaProducer.flush();

kafkaProducer.close();

I'm getting

 WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error connecting to node kafka1:39092 (id: 1 rack: null)
java.net.UnknownHostException: kafka1

and the message is never pushed. Could you help me figure out either of my docker or the java connector have a problem?

thanks in advance


Solution

  • The problem is in the way kafka works with advertised listeners.

    Whenever a connection request comes into kafka, this is what happens inside:

    1. It takes a look at where it receives the request

    2. This incoming request is mapped to a connection:

      KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093
      
    3. It then knows the name of your connection: In your case the "EXTERNAL" connection is used

    4. It uses this name to return the ADVERTISED_LISTENER, which then is used for the application to actually send/receive data.

      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092
      
    5. In your case this is returned:

      EXTERNAL://kafka1:39092
      

    You see exactly this also in your error message:

    UnknownHostException: kafka1
    

    And this makes totally sense: Your application is outside of the docker network, it doesnt know the host "kafka1", but this is what the kafka broker returns to actually send data to.

    So to fix your issue you have to change the "ADVERTISED_LISTENER" variable to this:

      KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://127.0.0.1:39092