Search code examples
pythonapache-kafkaconsumerpykafka

How to select starting offset in Pykafka simpleconsumer?


In my kafka cluster single partition topic i have a simple consumer processing all incoming messages, in case of error about data processed i want to reprocess in the same order all message from a certain offset (not the beginning) to fix the inconsistency and keep the original ordered sequence of message from kafka.

Is there a way to do it in with Pykafka? i'm not figuring it out


Solution

  • You need to call reset_offsets(). For example:

    consumer = topic.get_simple_consumer(consumer_group="example")
    partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
    # because we passed in a consumer_group the new offsets will be saved in Kafka
    consumer.reset_offsets(partition_offsets=partition_offset_pairs)
    

    (where get_offset_for_partition() is a function you define). Or for a single-partition topic:

    # read from offset 123456
    consumer = topic.get_simple_consumer()
    partition = topic.partitions[0]
    consumer.reset_offsets([(partition, 123456)])
    

    The same reset_offsets() method is also available on BalancedConsumer & ManagedBalanceConsumer classes too.

    Note that as part of Kafka's design, messages are only guaranteed in-order for each topic partition independently.