Search code examples
kubernetesapache-kafkaminikubekcat

Connecting to kafka from another namespace inside k8s


I have the following configuration for my kafka and zookeeper in my minikube:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: kafka
spec:
  selector:
    app: kafka
  ports:
    - protocol: TCP
      port: 9092
      name: kafka-port
    - protocol: TCP
      port: 9094
      name: kafka-port-out
    - protocol: TCP
      port: 2181
      name: kafka-zk
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
      k8s-app: kube-dns
  template:
    metadata:
      labels:
        app: kafka
        k8s-app: kube-dns
    spec:
      containers:
        - name: kafka-container
          image: bitnami/kafka:latest
          env:
          - name: 'ALLOW_PLAINTEXT_LISTENER'
            value: 'yes'
          - name: 'KAFKA_CFG_ZOOKEEPER_CONNECT'
            value: 'zookeeper-service:2181'
          - name: 'KAFKA_CFG_LISTENERS'
            value: 'PLAINTEXT://:9092'
          - name: 'KAFKA_CFG_ADVERTISED_LISTENERS' # if I comment this and the next line it works only locally
            value: 'PLAINTEXT://kafka-service.kafka:9092'
          ports:
            - containerPort: 9092
              name: kafka-port
            - containerPort: 9094
              name: kafka-port-out
            - containerPort: 5555
              name: kafka-port-jmx
            - containerPort: 2181
              name: kafka-zk

That is the configuration for my zookeeper:

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-service
  namespace: kafka
spec:
  selector:
    app: zookeeper
  ports:
    - protocol: TCP
      port: 2181
      name: zookeeper-port
    - protocol: TCP
      port: 2888
      name: zookeeper-peer
    - protocol: TCP
      port: 3888
      name: leader-election
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper-deployment
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
      k8s-app: kube-dns
  template:
    metadata:
      labels:
        app: zookeeper
        k8s-app: kube-dns
    spec:
      containers:
        - name: zookeeper-container
          image: bitnami/zookeeper:latest
          env:
            - name: 'ALLOW_ANONYMOUS_LOGIN'
              value: 'yes'
            - name: 'ZOOKEEPER_ID'
              value: '1'
          ports:
            - containerPort: 2181
              name: zookeeper-port
            - containerPort: 2888
              name: zookeeper-peer
            - containerPort: 3888
              name: leader-election

And I have another deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafkacat-deployment
#  namespace: debug # I was using it from another namespace, but it was not working so I've tried to use the same
  namespace: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafkacat
      k8s-app: kube-dns
  template:
    metadata:
      labels:
        app: kafkacat
        k8s-app: kube-dns
    spec:
      containers:
      - name: kafkacat-container
        image: edenhill/kafkacat:1.5.0

Then I try to telnet it, and it works.

telnet kafka-service.kafka 9092
Trying 10.101.87.127...
Connected to kafka-service.kafka.svc.cluster.local.
Escape character is '^]'.

Here is the nslookup

nslookup kafka-service.kafka
Server:     10.96.0.10
Address:    10.96.0.10#53

Name:   kafka-service.kafka.svc.cluster.local
Address: 10.101.87.127

But when I try to reach it that is what I get:

kafkacat -b kafka-service.kafka:9092 -L
% ERROR: Failed to acquire metadata: Local: Timed out

So my guess it that the problem is in the Kafka configuration: If I comment the env var KAFKA_CFG_ADVERTISED_LISTENERS it it works like that:

# kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker -1: kafka-service.kafka:9092/bootstrap):
 1 brokers:
  broker 1001 at kafka-deployment-858c5c7f98-tt7sr:9092
 2 topics:
  topic "my_topic" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "__consumer_offsets" with 50 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001

If I try to produce a message:

kafkacat -b kafka-service.kafka:9092 -P -t my_topic
oi
% ERROR: Local: Host resolution failure: kafka-deployment-858c5c7f98-w2dm5:9092/1001: Failed to resolve 'kafka-deployment-858c5c7f98-w2dm5:9092': Temporary failure in name resolution (after 15382734ms in state INIT)

Then if I try to consume:

kafkacat -b kafka-service.kafka:9092 -C -t my_topic
% ERROR: Local: Host resolution failure: kafka-deployment-858c5c7f98-w2dm5:9092/1001: Failed to resolve 'kafka-deployment-858c5c7f98-w2dm5:9092': Temporary failure in name resolution (after 15406287ms in state INIT)

I have tried to configure the KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092 but I still get timeout when I try to get the information with kafkacat.

That is why the advertised listener is the hostname which is not reachable from the local machine. What can I do to fix the Kafka configuration in the cluster?


Solution

  • I applied the exact same manifest as you provided except for adding a follow tail into /dev/null on kafka-deployment pod and was able to produce and consume topics.

    I am running a k3s cluster in k3d:

    └[~]> kubectl get nodes                                                    
    NAME               STATUS   ROLES    AGE   VERSION
    k3d-dev-server     Ready    master   31m   v1.17.3+k3s1
    k3d-dev-worker-1   Ready    <none>   31m   v1.17.3+k3s1
    

    kafka-deployment manifest:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kafkacat-deployment
      #  namespace: debug # I was using it from another namespace, but it was not working so I've tried to use the same
      namespace: kafka
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafkacat
          k8s-app: kube-dns
      template:
        metadata:
          labels:
            app: kafkacat
            k8s-app: kube-dns
        spec:
          containers:
            - name: kafkacat-container
              image: edenhill/kafkacat:1.5.0
              resources: {}
              command:
                - sh
                - -c
                - "exec tail -f /dev/null"
    

    Terminal logs from kafka-deployment pod in Kafka namespace:

    / # kafkacat -b kafka-service.kafka:9092 -L
    Metadata for all topics (from broker 1001: kafka-service.kafka:9092/1001):
     1 brokers:
      broker 1001 at kafka-service.kafka:9092 (controller)
     0 topics:
    / # kafkacat -b kafka-service.kafka:9092 -P -t my_topic
    hi from kafkacat
    
    / # kafkacat -b kafka-service.kafka:9092 -C -t my_topic
    hi from kafkacat
    % Reached end of topic my_topic [0] at offset 1
    
    / # kafkacat -b kafka-service:9092 -L
    Metadata for all topics (from broker -1: kafka-service:9092/bootstrap):
     1 brokers:
      broker 1001 at kafka-service.kafka:9092 (controller)
     1 topics:
      topic "my_topic" with 1 partitions:
        partition 0, leader 1001, replicas: 1001, isrs: 1001
    / # kafkacat -b kafka-service:9092 -P -t my_topic
    hi from kafka2
    
    / # kafkacat -b kafka-service:9092 -C -t my_topic
    hi from kafkacat
    hi from kafka2
    % Reached end of topic my_topic [0] at offset 2
    

    Terminal logs of kafka-deployment from debug namespace:

    └[~]> kubectl exec -it kafkacat-deployment-76f9c9db6d-8fth4 -n debug -- ash
    / # kafkacat -b kafka-service.kafka:9092 -L
    Metadata for all topics (from broker 1001: kafka-service.kafka:9092/1001):
     1 brokers:
      broker 1001 at kafka-service.kafka:9092 (controller)
     1 topics:
      topic "my_topic" with 1 partitions:
        partition 0, leader 1001, replicas: 1001, isrs: 1001
    / # kafkacat -b kafka-service.kafka:9092 -P -t my_topic
    hi from debug namespace
    
    / # kafkacat -b kafka-service.kafka:9092 -C -t my_topic
    hi from kafkacat
    hi from kafka2
    hi from debug namespace
    % Reached end of topic my_topic [0] at offset 3
    

    I got it to work in minikube as well, after applying a workaround for a bug in minikube:

    minikube ssh
    sudo ip link set docker0 promisc on
    

    Deleted the running kafka-deployment pod and it worked from both namespaces:

    (⎈ |minikube:default)➜  ~ kubectl delete pod -n kafka kafka-deployment-5c4f64599f-kn9xt
    pod "kafka-deployment-5c4f64599f-kn9xt" deleted
    (⎈ |minikube:default)➜  ~ kubectl exec -n kafka -it kafkacat-deployment-b595d9ccd-4bht7 -- ash
    / # kafkacat -b kafka-service.kafka:9092 -L
    Metadata for all topics (from broker 1003: kafka-service.kafka:9092/1003):
     1 brokers:
      broker 1003 at kafka-service.kafka:9092 (controller)
     0 topics:
    / # 
    (⎈ |minikube:default)➜  ~ kubectl exec -n debug -it kafkacat-deployment-b595d9ccd-pgzv6 -- ash
    / # kafkacat -b kafka-service.kafka:9092 -L
    Metadata for all topics (from broker 1003: kafka-service.kafka:9092/1003):
     1 brokers:
      broker 1003 at kafka-service.kafka:9092 (controller)
     0 topics:
    / #