Search code examples
pythonamazon-web-servicesapache-kafkaaws-msk

How to connect Python consumer to AWS MSK


I'm trying to connect my python consumer to AWS MSK cluster. how can I do that?

Have an AWS MSK Cluster running I'm trying consume message from the MSK cluster using python and kafka python.

error I'm getting

Traceback (most recent call last):
  File "consumer.py", line 23, in <module>
    for message in consumer:
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 270, in poll
    self.ensure_coordinator_ready()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/base.py", line 258, in ensure_coordinator_ready
    self._client.poll(future=future)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py", line 582, in poll
    self._maybe_connect(node_id)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py", line 392, in _maybe_connect
    conn.connect()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py", line 429, in connect
    if self._try_handshake():
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py", line 508, in _try_handshake
    self._sock.do_handshake()
  File "/usr/lib/python3.6/ssl.py", line 1077, in do_handshake
    self._sslobj.do_handshake()
  File "/usr/lib/python3.6/ssl.py", line 689, in do_handshake
    self._sslobj.do_handshake()
OSError: [Errno 0] Error

Solution

  • Using kafka-python:

    from kafka import KafkaConsumer
    
    if __name__ == '__main__':
        topic_name = 'example-topic'
    
        consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
                                 bootstrap_servers=['kafka2:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
        for msg in consumer:
            print(msg.value)
    
        if consumer is not None:
            consumer.close()
    

    from time import sleep
    
    from kafka import KafkaProducer
    
    # publish messages on topic
    def publish_message(producer_instance, topic_name, key, value):
        try:
            key_bytes = bytes(key, encoding='utf-8')
            value_bytes = bytes(value, encoding='utf-8')
            producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
            producer_instance.flush()
            print('Message ' + key + ' published successfully.')
        except Exception as ex:
            print('Exception in publishing message')
            print(str(ex))
    
    # establish kafka connection
    def connect_kafka_producer():
        _producer = None
        try:
            _producer = KafkaProducer(bootstrap_servers=['kafka1:9092'])
        except Exception as ex:
            print('Exception while connecting Kafka')
            print(str(ex))
        finally:
            return _producer
    
    if __name__ == '__main__':
        kafka_producer = connect_kafka_producer()
        x = 0
        while True:
            publish_message(kafka_producer, 'raw_recipes', str(x), 'This is message ' + str(x))
            x += 1
        
        if kafka_producer is not None:
                kafka_producer.close()