Search code examples
scalaapache-kafkadocker-composeapache-flink

why Flink kafka client is trying to connect to localhost:9092 while It is set up to connect to 172.17.0.1:9092?


I am trying to set up a flink jobmanager-taskmanager with docker-compose with this config:

version: "3.7"
services:
  jobmanagerconfig:
    image: flink:1.13.2-scala_2.12
    expose:
      - "6133"
      - "6123"
    ports:
      - "8085:8081"
    command: standalone-job --job-classname net.mongerbot.configManager.App
    volumes:
      - ./usrlib/:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanagerconfig
        parallelism.default: 2
        taskmanager.numberOfTaskSlots: 4
      - KAFKA_URI=${KAFKA_URI}
      - KAFKA_PORT=${KAFKA_PORT}
      - KAFKA_groupId=${KAFKA_groupId}
  taskmanagerconfig:
    image: flink:1.13.2-scala_2.12
    depends_on:
      - jobmanagerconfig
    links:
      - jobmanagerconfig
    command: taskmanager
#    scale: 1
    volumes:
      - ./usrlib/:/opt/flink/usrlib
    environment:
      - KAFKA_URI=${KAFKA_URI}
      - KAFKA_PORT=${KAFKA_PORT}
      - KAFKA_groupId=${KAFKA_groupId}
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanagerconfig
        parallelism.default: 2
        taskmanager.numberOfTaskSlots: 4
volumes:
  usrlib:

networks:
  default:
    external:
      name: mongerbot_network

The environment variables have the correct value in both containers. and as the log says the configured kafka client is set up to connect to 172.17.0.1:9092 as well:

docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,065 INFO  org.apache.kafka.clients.consumer.ConsumerConfig             [] - ConsumerConfig values: 
docker-taskmanagerconfig-1  |   allow.auto.create.topics = true
docker-taskmanagerconfig-1  |   auto.commit.interval.ms = 5000
docker-taskmanagerconfig-1  |   auto.offset.reset = latest
docker-taskmanagerconfig-1  |   bootstrap.servers = [172.17.0.1:9092]
docker-taskmanagerconfig-1  |   check.crcs = true
docker-taskmanagerconfig-1  |   client.dns.lookup = default
docker-taskmanagerconfig-1  |   client.id = 
docker-taskmanagerconfig-1  |   client.rack = 
docker-taskmanagerconfig-1  |   connections.max.idle.ms = 540000
docker-taskmanagerconfig-1  |   default.api.timeout.ms = 60000
docker-taskmanagerconfig-1  |   enable.auto.commit = true
docker-taskmanagerconfig-1  |   exclude.internal.topics = true
...

but this is the next lines of logs exactly after the kafka client log:

docker-taskmanagerconfig-1  |   value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
docker-taskmanagerconfig-1  | 
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,084 INFO  org.apache.kafka.clients.consumer.KafkaConsumer              [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Subscribed to partition(s): config.subscribe-0, config.subscribe-2
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,090 INFO  org.apache.kafka.clients.Metadata                            [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Cluster ID: s2iVODWcQ2Kbw4R5jL6RCw
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,091 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,094 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,094 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,094 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka version: 2.4.1
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,095 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka commitId: c57222ae8cd7866b
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,095 INFO  org.apache.kafka.common.utils.AppInfoParser                  [] - Kafka startTimeMs: 1670492216094
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,096 INFO  org.apache.kafka.clients.consumer.KafkaConsumer              [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Subscribed to partition(s): config.subscribe-1
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,101 INFO  org.apache.kafka.clients.Metadata                            [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Cluster ID: s2iVODWcQ2Kbw4R5jL6RCw
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,102 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,103 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Connection to node 2147483646 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,104 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-configManager-8, groupId=configManager] Group coordinator localhost:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,197 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=consumer-configManager-7, groupId=configManager] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
docker-taskmanagerconfig-1  | 2022-12-08 09:36:56,207 WARN  org.apache.kafka.clients.NetworkClient 

and as you can see it is trying to connect to localhost:9092.


Solution

  • this problem is not related to flink or kafka consumer. it was related to kafka server itself. the server should be configured to accept from 172.17.0.1 but it was setup to accept incoming request for kafka and localhost:

    version: '2'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 9092:9092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    

    I changed PLAINTEXT_HOST://localhost:9092 to PLAINTEXT_HOST://172.17.0.1:9092 and it fixed. (it was confusing because other clients (conduktor) could connect to the kafka with 172.17.0.1:9092 address even though this address was not in KAFKA_ADVERTISED_LISTENERS