I have following program to consume all the messages coming to Kafka.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
consumer.close()
Using above program i am able to consume all the messages coming to Kafka. But once all messages are consumed, i want to close the kafka consumer which is not happening. I need help in same.
I am able to close kafka consumer now if i provide consumer_timeout_ms argument to KafkaConsumer object. It accepts timeout value in millisecond. Below is the code snippet.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'],
consumer_timeout_ms=1000)
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
consumer.close()
In above code if consumer doesn't see any message for 1 second it will close the session.