Search code examples
pythonapache-kafkakombu

Connect to Kafka using kombu got class NoBrokersAvailable(confluent_kafka.KafkaException):


I tried to connect to Kafka using kombu(v5.3.1) and started with a very simple test program but got the exception at the connect. I have the kafka and zookeeper up and running locally. I can also use the kafka-python to connect and send message. Does anyone know what I might be missing? Thanks

from __future__ import annotations

import datetime

from kombu import Connection

with Connection('confluentkafka://127.0.0.1:9092') as conn:
    simple_queue = conn.SimpleQueue('simple_queue')
    message = f'helloworld, sent at {datetime.datetime.today()}'
    simple_queue.put(message)
    print(f'Sent: {message}')
    simple_queue.close()

the exception:

Traceback (most recent call last):
  File "/Users/csun/bah/evaluation/py4J/src/main/java/python/kombu_producer.py", line 23, in <module>
    with Connection('confluentkafka://127.0.0.1:9092') as conn:
  File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/connection.py", line 201, in __init__
    if not get_transport_cls(transport).can_parse_url:
  File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/transport/__init__.py", line 90, in get_transport_cls
    _transport_cache[transport] = resolve_transport(transport)
  File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/transport/__init__.py", line 75, in resolve_transport
    return symbol_by_name(transport)
  File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/utils/imports.py", line 59, in symbol_by_name
    module = imp(module_name, package=package, **kwargs)
  File "/Users/csun/.pyenv/versions/3.9.13/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/transport/confluentkafka.py", line 89, in <module>
    class NoBrokersAvailable(confluent_kafka.KafkaException):
AttributeError: 'NoneType' object has no attribute 'KafkaException'

I looked into the kombu code and it seems we just need the Connection('confluentkafka://127.0.0.1:9092') to connect to kafka and I can use the kafka-python package to connect to kafka without any issue.

My goal is to connect to kafka broker, and be able to send/receive message to/from a specific topic


Solution

  • Based on that specific error, you can see in the source code that it appears an import has failed

    https://github.com/celery/kombu/blob/main/kombu/transport/confluentkafka.py#L71-L79

    In other words, you need to actually install confluent_kafka first

    https://github.com/confluentinc/confluent-kafka-python#install


    If kafka-python works, then I'd continue to use that since it is native Python without librdkafka C bindings.