Search code examples
pythonapache-kafkakafka-python

Reading the last message from Kafka


I am trying to read the last message from a Kafka topic but I cannot make it work. I tried different methods that you can find below with their errors or problems Topic description:

$ kafka-topics.sh --bootstrap-server localhost:9092 --topic 52.5_13.4 --describe  
Topic: 52.5_13.4    TopicId: VFJtIO-UQBiktUeO5uVQ7w PartitionCount: 1   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: 52.5_13.4    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

And the topic has multiple messages that I can get via the client.

Setup:

from kafka import KafkaConsumer, TopicPartition
import json

consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset="earliest", enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode("utf-8")), consumer_timeout_ms=600,)
topic = TopicPartition('52.5_13.4', 0)

Method 1

consumer.subscribe('52.5_13.4')
consumer.poll()
consumer.seek_to_end()

Throws

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/.local/lib/python3.10/site-packages/kafka/consumer/group.py", line 865, in seek_to_end
    assert partitions, 'No partitions are currently assigned'
AssertionError: No partitions are currently assigned

Method 2

consumer.assign([topic])
consumer.poll()
consumer.seek_to_end()

last_msg = None
for msg in consumer:
  print("Inside consumer")
  last_msg = msg.value
  print("+++")

This prints nothing even the print command before the msg.value and after the timeout of the consumer, it just gets out of the for loop.

Method 3

consumer = KafkaConsumer('52.5_13.4', bootstrap_servers='localhost:9092', enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode("utf-8")),consumer_timeout_ms=60000,)
topic = TopicPartition('52.5_13.4', 0)

consumer.poll()
consumer.seek_to_end(topic)
for message in consumer:
    print(message)

Again prints nothing and after timeout gets out of the loop.


Solution

  • I couldn't make seek_to_end work but using the combination of end_offsets and seek can do the job:

    from kafka import KafkaConsumer, TopicPartition
    import json
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset="earliest", enable_auto_commit=True, value_deserializer=lambda x: json.loads(x.decode("utf-8")), consumer_timeout_ms=600,)
    consumer.subscribe('52.5_13.4')
    partition = TopicPartition('52.5_13.4', 0)
    end_offset = consumer.end_offsets([partition])
    consumer.seek(partition,list(end_offset.values())[0]-1)
    
    for m in consumer:
      print(m.value)