I have a Nodejs microservice and a Kafka broker running in the same cluster.
The kafka broker and zookeeper are running without errors, but I am not sure how to connect to them.
kafka.yaml
# create namespace
apiVersion: v1
kind: Namespace
metadata:
name: "kafka"
labels:
name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
type: NodePort
ports:
- name: zookeeper-port
port: 2181
nodePort: 30181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-broker
name: kafka-service
namespace: kafka
spec:
ports:
- port: 9092
selector:
app: kafka-broker
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka-broker
template:
metadata:
labels:
app: kafka-broker
spec:
hostname: kafka-broker
containers:
- env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
# value: 10.244.0.35:2181
value: zookeeper-service:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
# - name: KAFKA_ADVERTISED_HOST_NAME
# value: kafka-broker
# - name: KAFKA_ADVERTISED_PORT
# value: "9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
Connecting using kafka-service:9092
or kafka-broker:9092
doesn't work and leads to a timeout.
kafka.js
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['PLAINTEXT://kafka-broker:9092'], // !!! connection string
})
async function createProducer() {
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello KafkaJS user!' }],
})
await producer.disconnect()
}
createProducer()
[auth-pod] {"level":"WARN","timestamp":"2023-03-24T15:35:41.511Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
[auth-pod] Listening on port 3000...
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:41.586Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":0,"retryTime":292}
[auth-pod] Connected to: mongodb://auth-mongo-srv:27017/auth
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:41.881Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":1,"retryTime":596}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:42.479Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":2,"retryTime":1184}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:43.665Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":3,"retryTime":2782}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:46.449Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":4,"retryTime":5562}
[auth-pod] {"level":"ERROR","timestamp":"2023-03-24T15:35:52.015Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).","retryCount":5,"retryTime":12506}
[auth-pod] node:internal/process/promises:288
[auth-pod] triggerUncaughtException(err, true /* fromPromise */);
[auth-pod] ^
[auth-pod]
[auth-pod] KafkaJSNonRetriableError
[auth-pod] Caused by: KafkaJSConnectionError: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).
[auth-pod] at /app/node_modules/kafkajs/src/network/connection.js:254:11
[auth-pod] ... 8 lines matching cause stack trace ...
[auth-pod] at async createProducer (/app/src/kakfka/connect.js:11:3) {
[auth-pod] name: 'KafkaJSNumberOfRetriesExceeded',
[auth-pod] retriable: false,
[auth-pod] helpUrl: undefined,
[auth-pod] retryCount: 5,
[auth-pod] retryTime: 12506,
[auth-pod] [cause]: KafkaJSConnectionError: Failed to connect: Port should be >= 0 and < 65536. Received type number (NaN).
[auth-pod] at /app/node_modules/kafkajs/src/network/connection.js:254:11
[auth-pod] at new Promise (<anonymous>)
[auth-pod] at Connection.connect (/app/node_modules/kafkajs/src/network/connection.js:167:12)
[auth-pod] at ConnectionPool.getConnection (/app/node_modules/kafkajs/src/network/connectionPool.js:56:24)
[auth-pod] at Broker.connect (/app/node_modules/kafkajs/src/broker/index.js:86:52)
[auth-pod] at async /app/node_modules/kafkajs/src/cluster/brokerPool.js:93:9
[auth-pod] at async /app/node_modules/kafkajs/src/cluster/index.js:107:14
[auth-pod] at async Cluster.connect (/app/node_modules/kafkajs/src/cluster/index.js:146:5)
[auth-pod] at async Object.connect (/app/node_modules/kafkajs/src/producer/index.js:219:7)
[auth-pod] at async createProducer (/app/src/kakfka/connect.js:11:3) {
[auth-pod] retriable: true,
[auth-pod] helpUrl: undefined,
[auth-pod] broker: 'PLAINTEXT:NaN',
[auth-pod] code: undefined,
[auth-pod] [cause]: undefined
[auth-pod] }
[auth-pod] }
[auth-pod]
[auth-pod] Node.js v18.15.0
If I use the IP of the pod kafka-broker-5c7f7d4f77-nxlwm
directly brokers: ['10.244.0.94:9092']
, I also get an error. Using the default namespace instead of a separate namespace didn't make a difference.
After switching to a StatefulSet based on this answer, I can connect using the IP of kafka-broker-0
'10.244.0.110:9092'
, but I get another error: KafkaJSProtocolError: Replication-factor is invalid
. I don't know why the dns resolution would fail, but using the name 'kafka-broker-0:9092'
, leads to the same error as before "[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout"
.
Based on
If you have multiple REST Proxy pods running, Kubernetes will route the traffic to one of them. source
I should be able to use the Kubernetes service kafka-service
to load balance requests without hard coding an IP address. (There wasn't a targetPort
, but it still doesn't work after adding targetPort: 9092
, although I am not sure which protocol to use)
I looked at the logs of the kafka-broker pod and noticed an exception.
[2023-03-24 18:01:25,123] WARN [Controller id=1, targetBrokerId=1] Error connecting to node kafka-broker:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-broker
at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)
at java.base/java.net.InetAddress.getAllByName0(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at java.base/java.net.InetAddress.getAllByName(Unknown Source)
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:513)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:246)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
I think that specifying KAFKA_ADVERTISED_LISTENERS
should be sufficient (answer), so I am guessing there is a problem with dns resolution.
Using a headless service by adding clusterIP: "None"
and changing the name to kafka-broker
in case that PLAINTEXT://kafka-broker:9092
uses the service and not the deployment didn't help.
# create namespace
apiVersion: v1
kind: Namespace
metadata:
name: "kafka"
labels:
name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
type: NodePort
ports:
- name: zookeeper-port
port: 2181
nodePort: 30181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
clusterIP: "None"
# ports:
# - protocol: TCP
# port: 9092
# targetPort: 9092
selector:
app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
# replicas: 1
selector:
matchLabels:
app: kafka-broker
template:
metadata:
labels:
app: kafka-broker
spec:
hostname: kafka-broker
containers:
- env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
# value: 10.244.0.35:2181
value: zookeeper-service:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
Edit:
Not sure why I had a KafkaJSProtocolError: Replication-factor is invalid
error, but changing the service as follows prevents it. (It might be because I was using the same name for the service and deployment. I don't fully understand headless services, but I also added a port.)
# create namespace
apiVersion: v1
kind: Namespace
metadata:
name: "kafka"
labels:
name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
# type: NodePort
ports:
- name: zookeeper-port
port: 2181
# nodePort: 30181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-srv
name: kafka-srv
namespace: kafka
spec:
# headless service
clusterIP: "None"
ports:
- name: foo
port: 9092
selector:
app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
# replicas: 1
selector:
matchLabels:
app: kafka-broker
template:
metadata:
labels:
app: kafka-broker
spec:
hostname: kafka-broker
containers:
- env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.244.0.64:9092'],
})
async function createProducer() {
const producer = kafka.producer()
try {
await producer.connect()
console.log('connected', producer)
// await producer.send({
// topic: 'test-topic',
// messages: [{ value: 'Hello KafkaJS user!' }],
// })
// await producer.disconnect()
} catch (err) {
console.log("Couldn' connect to broker")
console.error(err)
}
}
[auth-pod] connected {
[auth-pod] connect: [AsyncFunction: connect],
[auth-pod] disconnect: [AsyncFunction: disconnect],
[auth-pod] isIdempotent: [Function: isIdempotent],
[auth-pod] events: {
[auth-pod] CONNECT: 'producer.connect',
[auth-pod] DISCONNECT: 'producer.disconnect',
[auth-pod] REQUEST: 'producer.network.request',
[auth-pod] REQUEST_TIMEOUT: 'producer.network.request_timeout',
[auth-pod] REQUEST_QUEUE_SIZE: 'producer.network.request_queue_size'
[auth-pod] },
[auth-pod] on: [Function: on],
[auth-pod] send: [AsyncFunction: send],
[auth-pod] sendBatch: [AsyncFunction: sendBatch],
[auth-pod] transaction: [AsyncFunction: transaction],
[auth-pod] logger: [Function: getLogger]
[auth-pod] }
Edit 2: When connecting successfully using the ip address, I also get java.net.UnknownHostException: kafka-broker
in the kafka-broker-0 pod. (The error keeps repeating as well) I thought that the pod was being reached by kafkajs and then the pod threw an error, but the error happens regardless. Matching the service name with the advertised host name prevents it.
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-broker
name: kafka-broker
# namespace: kafka
spec:
# headless service
clusterIP: "None"
ports:
- name: foo
port: 9092
selector:
app: kafka-broker
I can't connect to the pod kafka-broker-0:9092
directly, but now using the service name kafka-broker:9092
works.
I still don't fully understand headless services, but using the following configuration, I can connect using the service name 'kafka-broker.kafka.svc.cluster.local:9092'
. Namespaces and DNS
# create namespace
apiVersion: v1
kind: Namespace
metadata:
name: "kafka"
labels:
name: "kafka"
---
# create zookeeper service
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
namespace: kafka
spec:
# type: NodePort
ports:
- name: zookeeper-port
port: 2181
# nodePort: 30181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
---
# deploy kafka broker
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
# headless service
clusterIP: "None"
ports:
- name: foo
port: 9092
selector:
app: kafka-broker
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: kafka-broker
name: kafka-broker
namespace: kafka
spec:
# replicas: 1
selector:
matchLabels:
app: kafka-broker
template:
metadata:
labels:
app: kafka-broker
spec:
hostname: kafka-broker
containers:
- env:
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://kafka-broker:9092
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
name: kafka-broker
ports:
- containerPort: 9092
# TODO add persistent volume
Edit: there is an issue when using namespaces