Search code examples
javaspringspring-bootapache-kafkaspring-kafka

Manually acknowledge Kafka Event A consuming after producing event B


I have a case where I have to consume event A and do some processing, then produce the event B. So my problem is what would happen is the processing crashed and the application couldn't produce B while it consumed already A. My approach is to acknowledge after successfully publishing B, am I correct or should implement another solution for this case?

@KafkaListener(
        id = TOPIC_ID,
        topics = TOPIC_ID,
        groupId = GROUP_ID,
        containerFactory = LISTENER_CONTAINER_FACTORY
)
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {

    try {
        final AEvent aEvent = messages.stream()
                .filter(message -> null != message.getPayload())
                .map(Message::getPayload)
                .findFirst()
                .get();

        processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
                .subscribe(
                        response -> {
                            ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);

                            ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
                            future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
                                @Override
                                public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
                                    //TODO: do when event published successfully
                                }

                                @Override
                                public void onFailure(Throwable exception) {
                                    exception.printStackTrace();
                                    throw new ExampleException();
                                }
                            });
                        },
                        error -> {
                            error.printStackTrace();
                            throw new ExampleException();
                        }
                );
        acknowledgment.acknowledge(); // ??
    } catch (ExampleException) {
        exception.printStackTrace();
    }
}

Solution

  • You can't manage kafka "acknowledgments" when using async code such as reactor.

    Kafka does not manage discrete acks for each topic/partition, just the last committed offset for the partition.

    If you process two records asynchronously, you will have a race as to which offset will be committed first.

    You need to perform the sends on the listener container thread to maintain proper ordering.