Search code examples
kubernetesapache-kafkakafka-consumer-apiportforwarding

Unable to send messages to port-forwarded Kafka pod


I’m playing around with a Kafka pod (the incubator/kafka chart) I have running in a kubernetes cluster, trying to send and receive some test data. I have a consumer running that I kicked off using:

kubectl exec kafka-0 \
  -n kafka-namespace \
  -- kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic test

I was able to successfully send data to the consumer using:

kubectl exec -i kafka-0 \
  -n kafka-namespace \
  -- kafka-console-producer \
  --broker-list localhost:9092 \
  --topic test < testdata.jsonl

I’d like to send data using port-forwarding, as that will make it easier for me to do some partitioning stuff. However, it keeps failing for me. I set up port forwarding by running:

kubectl port-forward \
  -n kafka-namespace \
  svc/kafka 9092:9092

I’m trying to send data to Kafka by running from my local machine:

kafka-console-producer \
  --broker-list localhost:9092 \
  --topic test < testdata.jsonl

In my port-forwarding terminal, I see Handling connection for 9092 appear. However, this fails with error:

[2022-07-27 15:48:44,227] ERROR Error when sending message to topic test with key: null, value: 139 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for test-0:120000 ms has passed since batch creation

If I’m doing port-forwarding, then I should just be able to use the kafka-console-producer command as if it’s running on localhost, right?


Solution

  • Short answer: no, a port forward will not (directly) work. Secondly, helm/charts repo in its entirety is deprecated and the Kafka Chart no longer updated past Nov 2020, and that's the only incubator repo I am familiar with...

    In any case, specifically, you need external.enabled=true set in the values file, along with a few other values, and this will create a Service resource that is available to use externally from the k8s environment. This will expose the brokers on a NodePort so will not need kubectl port-forward and will instead use the exposed pods directly. Ref (scroll down to search for "external") - https://github.com/helm/charts/blob/master/incubator/kafka/README.md

    For example:

    values:
      rbac:
        create: true 
      externalAccess:
        enabled: true
        autoDiscovery:
          enabled: true
        service:
          type: NodePort
          port: 19092
    

    This will create a third service named kafka-0-external - for me at least - in addition to kafka and kafka-headless Services. Then you can port forward that Service.

    You can also use LoadBalancer as the type (be careful about what you're exposing to the public of course).

    This is for bitnami/kafka:22.1.5.

    Or, you can simply use kubectl exec instead into a Kafka pod, and then you wouldn't need any networking setup or need to clutter your host with Kafka downloads, as the pod will have all the scripts you need.

    Also worth mentioning, if you want to easily use Kafka in Kubernetes (in a way which has been updated in the meantime), use an Operator such as https://strimzi.io