I'm trying to switch Python code from aiokafka to confluent_kafka and having problems with reading historical data.
The system has only one producer for a given topic, and several independent consumers (each with a separate group ID). When each consumer starts it wants to read the most recent historical message for a subset of topics (call them historical topics), then read all new messages. The exact starting point of the historical data doesn't matter, as the main point is to get information for topics that are rarely written. The topics for which historical data are wanted will only ever have one partition.
It's getting the historical data that is giving me fits.
I would prefer not to have to read any messages before seeking, since the message is likely to be newer than I want to start with. But it appears one has to at least call Consumer.poll before Kafka assigns topic partitions.
What is the recommended sequence?
I have tried two basic approaches:
on_assign
callback argument to Consumer.subscribe
to read the current offset and call seek.In both cases:
Consumer.seek
usually or always fails with ""Local: Erroneous state".Consumer.positions
always returns -1001, which might be a clue.
To get around that I call Consumer.get_watermark_offsets
.Here is a simple example using on_assign:
from confluent_kafka import Consumer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.error import KafkaError
import base64
import os
max_history = 3
broker_addr = "broker:29092"
topic_names = ["test.message"]
def seek_back(
consumer,
partitions,
):
print(f"seek_back({partitions})")
# Show that consumer.position returns nothing useful
position_partitions = consumer.position(partitions)
print(f"{position_partitions=}")
for partition in partitions:
_, offset = consumer.get_watermark_offsets(partition)
print(f"{partition.topic} has offset {offset}")
if offset <= 0:
continue
partition.offset = max(0, offset - max_history)
try:
consumer.seek(partition)
except Exception as e:
print(f"{partition.topic} seek to {partition.offset} failed: {e!r}")
else:
print(f"{partition.topic} seek to {partition.offset} succeeded")
def run(topic_names):
random_str = base64.urlsafe_b64encode(os.urandom(12)).decode().replace("=", "_")
consumer = Consumer(
{
"group.id": random_str,
"bootstrap.servers": broker_addr,
"allow.auto.create.topics": False,
}
)
new_topic_list = [
NewTopic(topic_name, num_partitions=1, replication_factor=1)
for topic_name in topic_names
]
broker_client = AdminClient({"bootstrap.servers": broker_addr})
create_result = broker_client.create_topics(new_topic_list)
for topic_name, future in create_result.items():
exception = future.exception()
if exception is None:
continue
elif (
isinstance(exception.args[0], KafkaError)
and exception.args[0].code() == KafkaError.TOPIC_ALREADY_EXISTS
):
pass
else:
print(f"Failed to create topic {topic_name}: {exception!r}")
raise exception
consumer.subscribe(topic_names, on_assign=seek_back)
while True:
message = consumer.poll(timeout=0.1)
if message is not None:
error = message.error()
if error is not None:
raise error
print(f"read {message=}")
return
run(topic_names)
Running this after writing some messages for that topic (using other code) gives me:
seek_back([TopicPartition{topic=test.topic,partition=0,offset=-1001,error=None}])
position_partitions=[TopicPartition{topic=test.topic,partition=0,offset=-1001,error=None}]
test.topic has offset 10
seek_partitions=[TopicPartition{topic=test.topic,partition=0,offset=7,error=None}]
test.topic seek to 0 failed: KafkaException(KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 7: Local: Erroneous state"})
I am using: confluent_kafka 1.8.2 and running the broker using Docker image confluentinc/cp-enterprise-kafka:6.2.4 (along with the same version of zookeper and schema registry, since my normal code uses Avro schemas).
From https://github.com/confluentinc/confluent-kafka-python/issues/11#issuecomment-230089107 it appears that one solution is to specify an on_assign callback to Consumer.subscribe, then call Consumer.assign inside the on_assign callback, e.g.:
def on_assign_callback(
consumer,
partitions,
):
"""Modify assigned partitions to read up to MAX_HISTORY old messages"""
for partition in partitions:
min_offset, max_offset = consumer.get_watermark_offsets(partition)
desired_offset = max_offset - MAX_HISTORY
if desired_offset <= min_offset:
desired_offset = OFFSET_BEGINNING
partition.offset = desired_offset
consumer.assign(partitions)
Subtleties:
"auto.offset.reset": "earliest"
. That way if the broker discards data while the on_assign callback is running, deleting the data at the specified offset, the consumer will read from the beginning.