My need is to make the producer to start from the last message it processed before it crashed. Fortunately I am in the case of having only one topic, with one partition and one consumer.
To do so I tried https://github.com/Shopify/sarama but it doesn't seems to be available yet. I am now using https://godoc.org/github.com/bsm/sarama-cluster, which allow me to commit every message offset.
I cannot retrieve the last committed offset
I cannot figure out how to make a sarama consumer to start from said offset. The only parameter I've found so far is Config.Producer.Offsets.Initial
.
OffsetNewest
will make it start from the last message produced, not the last processed b the consumer.Thank in advance
P.S. I am using Kafka 10.0, so the offsets are stores in a kafka and not in zookeeper.
EDIT1: Partial solution: fetch all the messages since sarama.OffsetOldest and skip all of them until we found a non processed one.
If offset was already saved for a partition, sarama-cluster will resume consumption from that offset. The Config.Producer.Offsets.Initial
option is used only if no saved offset is present (first run for a consumer group).
You can verify this by adding the following line at the beginning of your main()
function:
sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)
Then you'll see something like the following in the output:
cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12
The 12 in that is the offset Sarama is going to start from for that partition (sample/0).