Ive got a spring-kafka consumer set up. It is consuming avro data from the topic, maps the values and writes a CSV files. I manually commit the offset once the file is either of 25000 records long or each 5 minutes - whichever comes first.
A problem occurs when we restart the app because of patching/releases.
I have a method like this:
@PreDestroy
public void destroy() {
LOGGER.info("shutting down");
writeCsv(true);
acknowledgment.acknowledge(); // this normally commits the current offset
LOGGER.info("package commited: " + acknowledgment.toString());
LOGGER.info("shutting down completed");
}
So ive added some loggers there and this is how the log looks:
08:05:47 INFO KafkaMessageListenerContainer$ListenerConsumer - myManualConsumer: Consumer stopped
08:05:47 INFO CsvWriter - shutting down
08:05:47 INFO CsvWriter - created file: FEEDBACK1630476236079.csv
08:05:47 INFO CsvWriter - package commited: Acknowledgment for ConsumerRecord(topic = feedback-topic, partition = 1, leaderEpoch = 17, offset = 544, CreateTime = 1630415419703, serialized key size = -1, serialized value size = 156)
08:05:47 INFO CsvWriter - shutting down completed
The offset is never commited since the consumer stops working before the acknowledge() method is called. There are no erros in the log and we are getting duplicates after the app is started again.
Also one more question:
i want to set up a filter on consumer like this:
if(event.getValue().equals("GOOD") {
addCsvRecord(event)
} else {
acknowledgement.acknowledge() //to let it read next event
Lets say i got offset 100 - and GOOD event comes, i am adding it to the csv file, the file waits for more records and offset is not commited yet. A BAD event comes up next, it is filtered out and offset 101 is commited immiedietely. Then the file reaches its timeout and is about to close and call
acknowlegdment.acknowledge()
on offset 100.
@PreDestroy
is too late in the context lifecyle - the containers have already been stopped by then.
Implement SmartLifecycle
and do the acknowledgment in stop()
.
For your second question, just don't commit the bad offset; you will still get the next record(s).
Kafka maintains two pointers position
and committed
. They are related, but independent for a running application.