I'm developing a reactive application using Quarkus and Panache reactive.
When I consume a message in Kafka I have to update a database and I also want to manage duplicate messages.
In Avoiding message losses, duplication and lost / multiple processing in Kafka, the author suggests to perform two inserts in the DBMS, once to keep track of the message offset and once to process the business logic.
If a failure happens on the consumer side, when it's restarted the same message can be processed, so the first insert is going to fail an it has to skip the second insert.
Based on the information in Quarkus Kafka guide, I wrote the following code:
@Inject
Mutiny.Session session;
@ActivateRequestContext
public Uni<Void> persist(ConsumerRecord<Long, String> record) {
return session.withTransaction(t -> {
KafkaState state = new KafkaState();
state.topic = record.topic();
state.partition = record.partition();
state.offsetN = record.offset();
state.persist();
Event event = new Event();
event.key = record.key();
event.message = record.value();
return event.persistAndFlush().replaceWithVoid();
}).onTermination()
.call(() -> session.close())
.onFailure().call(t -> {
System.out.println(t);
return session.close();
});
}
Unfortunately, the previous code disregard the first entity KafkaState
.
After many attempts I found the following solution which I hope can be useful to others:
@ActivateRequestContext
public Uni<Void> persist(ConsumerRecord<Long, String> record) {
return session
.withTransaction(t -> {
KafkaState state = new KafkaState();
state.topic = record.topic();
state.partition = record.partition();
state.offsetN = record.offset();
return state.persist();
})
.onFailure().recoverWithNull()
.chain(t -> {
if (t != null) {
Event event = new Event();
event.key = record.key();
event.message = record.value();
return event.persistAndFlush().replaceWithVoid();
} else {
return Uni.createFrom().nullItem();
}
})
.onTermination().call(() -> session.close());
}
The two DB insert statements are chained through the chain(...)
method.
In the middle the onFailure()
operation catches any kind of failures coming from the first DB insert, in case of error it replace the result of the previous stage with null
.
In case of failure, the following stage in the pipeline (the function in chain
) receive a null object and finally it can skip the insert.