I tried to connect to Kafka using kombu(v5.3.1) and started with a very simple test program but got the exception at the connect. I have the kafka and zookeeper up and running locally. I can also use the kafka-python to connect and send message. Does anyone know what I might be missing? Thanks
from __future__ import annotations
import datetime
from kombu import Connection
with Connection('confluentkafka://127.0.0.1:9092') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = f'helloworld, sent at {datetime.datetime.today()}'
simple_queue.put(message)
print(f'Sent: {message}')
simple_queue.close()
the exception:
Traceback (most recent call last):
File "/Users/csun/bah/evaluation/py4J/src/main/java/python/kombu_producer.py", line 23, in <module>
with Connection('confluentkafka://127.0.0.1:9092') as conn:
File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/connection.py", line 201, in __init__
if not get_transport_cls(transport).can_parse_url:
File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/transport/__init__.py", line 90, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/transport/__init__.py", line 75, in resolve_transport
return symbol_by_name(transport)
File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/utils/imports.py", line 59, in symbol_by_name
module = imp(module_name, package=package, **kwargs)
File "/Users/csun/.pyenv/versions/3.9.13/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/Users/csun/bah/evaluation/py4J/src/main/java/python/venv/lib/python3.9/site-packages/kombu/transport/confluentkafka.py", line 89, in <module>
class NoBrokersAvailable(confluent_kafka.KafkaException):
AttributeError: 'NoneType' object has no attribute 'KafkaException'
I looked into the kombu code and it seems we just need the Connection('confluentkafka://127.0.0.1:9092') to connect to kafka and I can use the kafka-python package to connect to kafka without any issue.
My goal is to connect to kafka broker, and be able to send/receive message to/from a specific topic
Based on that specific error, you can see in the source code that it appears an import has failed
https://github.com/celery/kombu/blob/main/kombu/transport/confluentkafka.py#L71-L79
In other words, you need to actually install confluent_kafka
first
https://github.com/confluentinc/confluent-kafka-python#install
If kafka-python works, then I'd continue to use that since it is native Python without librdkafka C bindings.