Search code examples
goapache-kafkakafka-consumer-apisarama

Kafka Consumer: How to programatically consume from specific offset in Go Sarama


Recently, I began to study working with kafka. The project I'm working on uses sarama.

For reading messages I use ConsumerGroup.

I need to read the message again after some time if foo returns false. How can this be done?

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

            if ok := foo(message); ok {
                session.MarkMessage(message, "")
            } else {
                // ???
            }

    }

    return nil
}

Solution

  • You can reset the offset of a Consumer Group to an older offset by including the following in your Consumer Group's Setup() callback:

    func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
        sess.ResetOffset(topic, partition, offset, "")
    
        return nil
    }
    

    You can also achieve the same through console:

    kafka-consumer-groups \
        --bootstrap-server localhost:9092 \
        --group my-consumer-group \
        --topic myTopicName \
        --reset-offsets \
        --to-offfset 100 \
        --execute