Search code examples
pythonapache-kafkaairflowkafka-python

DNS lookup failed for broker


I am new to Kafka and currently attempting to send data from Airflow to a Kafka broker. However, I'm encountering an issue where it reports a DNS lookup failure for the broker.

broker service in docker-compose.yml

broker:
    image: confluentinc/cp-server:7.4.0
    hostname: broker
    container_name: broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9101:9101"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    networks:
      - confluent
    healthcheck:
      test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
      interval: 10s
      timeout: 5s
      retries: 5

kafka_stream.py

def stream_data():
    from kafka import KafkaProducer
    res = get_data()
    res = format_data(res)
    print("Function Started")
    print(res)

    try:
        producer = KafkaProducer(bootstrap_servers=['broker:29092'],api_version = (2, 5, 0), max_block_ms=500000)
        producer.send('user_created', res)
    except Exception as e:
        print("Failed to connect to Kafka broker : ", e)

with DAG('user_automation',
         default_args=default_args,
         schedule_interval='@daily',
         ) as dag:
    streaming_task = PythonOperator(
        task_id = 'stream_data_from_api',
        python_callable=stream_data
    )

Error:

[2024-07-28, 16:04:19 UTC] {conn.py:1276} WARNING - DNS lookup failed for broker:29092, exception was [Errno -3] Temporary failure in name resolution. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
[2024-07-28, 16:04:19 UTC] {conn.py:297} ERROR - DNS lookup failed for broker:29092 (0)

Here is my complete code: https://github.com/dhainiksuthar/airflow-kafka


Solution

  • Containers can only resolve each other by name when they are part of the same network.

    Looking at your repo, the airflow services are on the default network bridge rather than the isolated confluent one.

    Remove the networks list from the services, then they'll all be part of the same network