Search code examples
pythonapache-kafkakafka-consumer-api

How can I start consuming messages from Kafka in real-time rather than latest offset


I am using Kafka and Debezium to capture row related events from a database, it works as expected

In Python I have set up a consumer to process the JSON messages from the topic

# Kafka consumer configuration
config = {
    'bootstrap.servers': 'my-ip:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'latest'  
}

# Create the Consumer instance
consumer = Consumer(config)

# Subscribe to the topic
topic = 'my-ip.Database.Table'
consumer.subscribe([topic])

It works fine, the problem is I need this consumer to only care about the latest messages

I can process every message and discard stuff based on a timestamp basis, but I would rather start consuming from the correct offset to begin with

This was the loop I was using to process messages

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            print("No new messages.")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print("End of partition reached.")
                continue
            else:
                print(f"Error: {msg.error()}")
                break

        message = json.loads(msg.value().decode("utf-8"))
        payload = message.get("payload", {})
        operation = payload.get("op")  # c: create/insert, u: update, d: delete

        match operation:
            case "c":
                print("Processing insert operation.")
                process_message(payload["after"], "Insert")
            case "u":
                print("Processing update operation.")
                process_message(payload["after"], "Update")
            case "d":
                print("Delete operation detected, not processing.")
            case _:
                print(f"Unhandled operation type: {operation}")

except KeyboardInterrupt:
    print("Exiting consumer...")
finally:
    consumer.close()
    print("Consumer closed.")

How can I move the offset to the end from this consumer code that I have? Thank you!


Solution

  • You're already setting 'auto.offset.reset': 'latest' when you create a consumer. It defaults to latest, anyway.

    If you want to skip to the end of the topic while processing or before polling (rather than use the committed offset), use the seek functions of the consumer