I am using Kafka and Debezium to capture row related events from a database, it works as expected
In Python I have set up a consumer to process the JSON messages from the topic
# Kafka consumer configuration
config = {
'bootstrap.servers': 'my-ip:9092',
'group.id': 'my-group',
'auto.offset.reset': 'latest'
}
# Create the Consumer instance
consumer = Consumer(config)
# Subscribe to the topic
topic = 'my-ip.Database.Table'
consumer.subscribe([topic])
It works fine, the problem is I need this consumer to only care about the latest messages
I can process every message and discard stuff based on a timestamp basis, but I would rather start consuming from the correct offset to begin with
This was the loop I was using to process messages
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
print("No new messages.")
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached.")
continue
else:
print(f"Error: {msg.error()}")
break
message = json.loads(msg.value().decode("utf-8"))
payload = message.get("payload", {})
operation = payload.get("op") # c: create/insert, u: update, d: delete
match operation:
case "c":
print("Processing insert operation.")
process_message(payload["after"], "Insert")
case "u":
print("Processing update operation.")
process_message(payload["after"], "Update")
case "d":
print("Delete operation detected, not processing.")
case _:
print(f"Unhandled operation type: {operation}")
except KeyboardInterrupt:
print("Exiting consumer...")
finally:
consumer.close()
print("Consumer closed.")
How can I move the offset to the end from this consumer code that I have? Thank you!
You're already setting 'auto.offset.reset': 'latest'
when you create a consumer. It defaults to latest, anyway.
If you want to skip to the end of the topic while processing or before polling (rather than use the committed offset), use the seek functions of the consumer