Manually acknowledge Kafka Event A consuming after producing event B

I have a case where I have to consume event A and do some processing, then produce the event B. So my problem is what would happen is the processing crashed and the application couldn't produce B while it consumed already A. My approach is to acknowledge after successfully publishing B, am I correct or should implement another solution for this case?

        id = TOPIC_ID,
        topics = TOPIC_ID,
        groupId = GROUP_ID,
        containerFactory = LISTENER_CONTAINER_FACTORY
public void listen(List<Message<A>> messages, Acknowledgment acknowledgment) {

    try {
        final AEvent aEvent =
                .filter(message -> null != message.getPayload())

        processDao.doSomeProcessing() // returns a Mono<Example> by calling an externe API
                        response -> {
                            ProducerRecord<String, BEvent> BEventRecord = new ProducerRecord<>(TOPIC_ID, null, BEvent);

                            ListenableFuture<SendResult<String, BEvent>> future = kafkaProducerTemplate.send(buildBEvent());
                            future.addCallback(new ListenableFutureCallback<SendResult<String, BEvent>>() {
                                public void onSuccess(SendResult<String, BEvent> BEventSendResult) {
                                    //TODO: do when event published successfully

                                public void onFailure(Throwable exception) {
                                    throw new ExampleException();
                        error -> {
                            throw new ExampleException();
        acknowledgment.acknowledge(); // ??
    } catch (ExampleException) {


  • You can't manage kafka "acknowledgments" when using async code such as reactor.

    Kafka does not manage discrete acks for each topic/partition, just the last committed offset for the partition.

    If you process two records asynchronously, you will have a race as to which offset will be committed first.

    You need to perform the sends on the listener container thread to maintain proper ordering.