Search code examples
kubernetesapache-kafkakafkajs

Kafkajs connection string for service running in same Kubernetes cluster


I have a Nodejs microservice and a Kafka broker running in the same cluster.

The kafka broker and zookeeper are running without errors, but I am not sure how to connect to them.

kafka.yaml

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-service
  namespace: kafka
spec:
  ports:
  - port: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          # value: 10.244.0.35:2181
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        # - name: KAFKA_ADVERTISED_HOST_NAME
        #   value: kafka-broker
        # - name: KAFKA_ADVERTISED_PORT
        #   value: "9092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092

source

Connecting using kafka-service:9092 or kafka-broker:9092 doesn't work and leads to a timeout.

kafka.js

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['PLAINTEXT://kafka-broker:9092'], // !!! connection string
})

async function createProducer() {
  const producer = kafka.producer()

  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [{ value: 'Hello KafkaJS user!' }],
  })

  await producer.disconnect()
}

createProducer()
[auth-pod] {"level":"WARN","timestamp":"2023-03-24T15:35:41.511Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
[auth-pod] Listening on port 3000...
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:41.586Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":0,"retryTime":292}     
[auth-pod] Connected to: mongodb://auth-mongo-srv:27017/auth
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:41.881Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":1,"retryTime":596}     
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:42.479Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":2,"retryTime":1184}    
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:43.665Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":3,"retryTime":2782}    
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:46.449Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":4,"retryTime":5562}    
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:52.015Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":5,"retryTime":12506}   
[auth-pod] node:internal/process/promises:288
[auth-pod]             triggerUncaughtException(err, true /* fromPromise */);
[auth-pod]             ^
[auth-pod]
[auth-pod] KafkaJSNonRetriableError
[auth-pod]   Caused by: KafkaJSConnectionError: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).
[auth-pod]     at /app/node_modules/kafkajs/src/network/connection.js:254:11
[auth-pod]     ... 8 lines matching cause stack trace ...
[auth-pod]     at async createProducer (/app/src/kakfka/connect.js:11:3) {
[auth-pod]   name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod]   retriable: false,
[auth-pod]   helpUrl: undefined,
[auth-pod]   retryCount: 5,
[auth-pod]   retryTime: 12506,
[auth-pod]   [cause]: KafkaJSConnectionError: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).
[auth-pod]       at /app/node_modules/kafkajs/src/network/connection.js:254:11
[auth-pod]       at new Promise (<anonymous>)
[auth-pod]       at Connection.connect (/app/node_modules/kafkajs/src/network/connection.js:167:12)
[auth-pod]       at ConnectionPool.getConnection (/app/node_modules/kafkajs/src/network/connectionPool.js:56:24)
[auth-pod]       at Broker.connect (/app/node_modules/kafkajs/src/broker/index.js:86:52)
[auth-pod]       at async /app/node_modules/kafkajs/src/cluster/brokerPool.js:93:9
[auth-pod]       at async /app/node_modules/kafkajs/src/cluster/index.js:107:14
[auth-pod]       at async Cluster.connect (/app/node_modules/kafkajs/src/cluster/index.js:146:5)
[auth-pod]       at async Object.connect (/app/node_modules/kafkajs/src/producer/index.js:219:7)
[auth-pod]       at async createProducer (/app/src/kakfka/connect.js:11:3) {
[auth-pod]     retriable: true,
[auth-pod]     helpUrl: undefined,
[auth-pod]     broker: 'PLAINTEXT:NaN',
[auth-pod]     code: undefined,
[auth-pod]     [cause]: undefined
[auth-pod]   }
[auth-pod] }
[auth-pod]
[auth-pod] Node.js v18.15.0

If I use the IP of the pod kafka-broker-5c7f7d4f77-nxlwm directly brokers: ['10.244.0.94:9092'], I also get an error. Using the default namespace instead of a separate namespace didn't make a difference.

After switching to a StatefulSet based on this answer, I can connect using the IP of kafka-broker-0 '10.244.0.110:9092', but I get another error: KafkaJSProtocolError: Replication-factor is invalid. I don't know why the dns resolution would fail, but using the name 'kafka-broker-0:9092', leads to the same error as before "[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout".

Based on

If you have multiple REST Proxy pods running, Kubernetes will route the traffic to one of them. source

I should be able to use the Kubernetes service kafka-service to load balance requests without hard coding an IP address. (There wasn't a targetPort, but it still doesn't work after adding targetPort: 9092, although I am not sure which protocol to use)


I looked at the logs of the kafka-broker pod and noticed an exception.

[2023-03-24 18:01:25,123] WARN [Controller id=1, targetBrokerId=1] Error connecting to node kafka-broker:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-broker
 at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
 at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
 at java.base/java.net.InetAddress.getAllByName(Unknown Source)
 at java.base/java.net.InetAddress.getAllByName(Unknown Source)
 at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
 at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:513)
 at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
 at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
 at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
 at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
 at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
 at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
 at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

I think that specifying KAFKA_ADVERTISED_LISTENERS should be sufficient (answer), so I am guessing there is a problem with dns resolution.

Using a headless service by adding clusterIP: "None" and changing the name to kafka-broker in case that PLAINTEXT://kafka-broker:9092 uses the service and not the deployment didn't help.

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  clusterIP: "None"
  # ports:
  # - protocol: TCP
  #   port: 9092
  #   targetPort: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  # replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          # value: 10.244.0.35:2181
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092

full code

Edit: Not sure why I had a KafkaJSProtocolError: Replication-factor is invalid error, but changing the service as follows prevents it. (It might be because I was using the same name for the service and deployment. I don't fully understand headless services, but I also added a port.)

# create namespace
apiVersion: v1
kind: Namespace
metadata:
  name: "kafka"
  labels:
    name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
  namespace: kafka
spec:
  # type: NodePort
  ports:
    - name: zookeeper-port
      port: 2181
      # nodePort: 30181
      targetPort: 2181
  selector:
    app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - image: wurstmeister/zookeeper
          imagePullPolicy: IfNotPresent
          name: zookeeper
          ports:
            - containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-srv
  name: kafka-srv
  namespace: kafka
spec:
  # headless service
  clusterIP: "None"
  ports:
  - name: foo
    port: 9092
  selector:
    app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  namespace: kafka
spec:
  # replicas: 1
  selector:
    matchLabels:
      app: kafka-broker
  template:
    metadata:
      labels:
        app: kafka-broker
    spec:
      hostname: kafka-broker
      containers:
      - env:
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENERS
          value: PLAINTEXT://:9092
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-broker:9092
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        name: kafka-broker
        ports:
        - containerPort: 9092
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['10.244.0.64:9092'],
})

async function createProducer() {
  const producer = kafka.producer()

  try {
    await producer.connect()
    console.log('connected', producer)
    // await producer.send({
    //   topic: 'test-topic',
    //   messages: [{ value: 'Hello KafkaJS user!' }],
    // })

    // await producer.disconnect()
  } catch (err) {
    console.log("Couldn' connect to broker")
    console.error(err)
  }
}
[auth-pod] connected {
[auth-pod]   connect: [AsyncFunction: connect],
[auth-pod]   disconnect: [AsyncFunction: disconnect],
[auth-pod]   isIdempotent: [Function: isIdempotent],
[auth-pod]   events: {
[auth-pod]     CONNECT: 'producer.connect',
[auth-pod]     DISCONNECT: 'producer.disconnect',
[auth-pod]     REQUEST: 'producer.network.request',
[auth-pod]     REQUEST_TIMEOUT: 'producer.network.request_timeout',
[auth-pod]     REQUEST_QUEUE_SIZE: 'producer.network.request_queue_size'
[auth-pod]   },
[auth-pod]   on: [Function: on],
[auth-pod]   send: [AsyncFunction: send],
[auth-pod]   sendBatch: [AsyncFunction: sendBatch],
[auth-pod]   transaction: [AsyncFunction: transaction],
[auth-pod]   logger: [Function: getLogger]
[auth-pod] }

Edit 2: When connecting successfully using the ip address, I also get java.net.UnknownHostException: kafka-broker in the kafka-broker-0 pod. (The error keeps repeating as well) I thought that the pod was being reached by kafkajs and then the pod threw an error, but the error happens regardless. Matching the service name with the advertised host name prevents it.

---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-broker
  name: kafka-broker
  # namespace: kafka
spec:
  # headless service
  clusterIP: "None"
  ports:
  - name: foo
    port: 9092
  selector:
    app: kafka-broker

I can't connect to the pod kafka-broker-0:9092 directly, but now using the service name kafka-broker:9092 works.


Solution

  • I still don't fully understand headless services, but using the following configuration, I can connect using the service name 'kafka-broker.kafka.svc.cluster.local:9092'. Namespaces and DNS

    # create namespace
    apiVersion: v1
    kind: Namespace
    metadata:
      name: "kafka"
      labels:
        name: "kafka"
    ---
    # create zookeeper service
    apiVersion: v1
    kind: Service
    metadata:
      labels:
        app: zookeeper-service
      name: zookeeper-service
      namespace: kafka
    spec:
      # type: NodePort
      ports:
        - name: zookeeper-port
          port: 2181
          # nodePort: 30181
          targetPort: 2181
      selector:
        app: zookeeper
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      labels:
        app: zookeeper
      name: zookeeper
      namespace: kafka
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: zookeeper
      template:
        metadata:
          labels:
            app: zookeeper
        spec:
          containers:
            - image: wurstmeister/zookeeper
              imagePullPolicy: IfNotPresent
              name: zookeeper
              ports:
                - containerPort: 2181
    ---
    # deploy kafka broker
    apiVersion: v1
    kind: Service
    metadata:
      labels:
        app: kafka-broker
      name: kafka-broker
      namespace: kafka
    spec:
      # headless service
      clusterIP: "None"
      ports:
      - name: foo
        port: 9092
      selector:
        app: kafka-broker
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      labels:
        app: kafka-broker
      name: kafka-broker
      namespace: kafka
    spec:
      # replicas: 1
      selector:
        matchLabels:
          app: kafka-broker
      template:
        metadata:
          labels:
            app: kafka-broker
        spec:
          hostname: kafka-broker
          containers:
          - env:
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zookeeper-service:2181
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://:9092
            - name: KAFKA_ADVERTISED_LISTENERS
              value: PLAINTEXT://kafka-broker:9092
            image: wurstmeister/kafka
            imagePullPolicy: IfNotPresent
            name: kafka-broker
            ports:
            - containerPort: 9092
    # TODO add persistent volume
    

    Edit: there is an issue when using namespaces