Search code examples
javaspringapache-kafkakafka-consumer-apispring-kafka

Make a service after restart to skip unread Kafka messages, but use the same consumer group as before


We have a reactive Kafka consumer configured like this:

spring.kafka.consumer.group-id=MY-CONSUMER-GROUP
@Bean
public ReactiveKafkaConsumerTemplate<UUID, MyEvent> createConsumer(@Autowired KafkaProperties properties) {
    return new ReactiveKafkaConsumerTemplate<>(
        ReceiverOptions.<UUID, MyEvent>create(new HashMap<>(properties.buildConsumerProperties()))
            .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
            .subscription(List.of("MY-TOPIC")));
}

This ensures that when the consumer is created the first time, it doesn't read all the messages from the beginning, but reads only the new messages, see e.g. here.

We'd like to have this behaviour always, after each restart of the service. The messages are relevant for the service only if they are produced while the service is running.

However, the documentation says:

Once a consumer group has an offset written then this configuration parameter no longer applies. If the consumers in the consumer group are stopped and then restarted, they would pick up consuming from the last offset.

However, when our service is down for a while, we want it to skip the messages which were produced into the topic in the meantime, we want it to read from the same consumer group, but also from the end only, not from the last offset.

How to achieve it?

We use the consumer like this:

@EventListener(value = ApplicationReadyEvent.class)
public void listen() {
    consumerTemplate
        .receiveAutoAck()
        .concatMap(consumerRecord -> handleEvent(consumerRecord.key(), consumerRecord.value())
            .onErrorResume(__ -> Mono.empty())
        )
        .subscribe();
}

private Mono<MyEvent> handleEvent(@NonNull UUID key, @NonNull MyEvent event) {
    ....
}

I found a method with a promising name seekToEnd(), but:

  1. I am not sure if it does what I want :)
  2. As it returns void, I have no clue how to use it in the reactive call chain.

Solution

  • With reactor-kafka, use the addAssignListener receiver option.

    Then, when partitions are assigned, you can call seekToEnd() on each ReceiverPartition (the listener gets a collection of them).

    See the documentation.

    In the configuration mentioned in the question, the place for the configuration code is here:

    @Bean
    public ReactiveKafkaConsumerTemplate<UUID, MyEvent> createConsumer(@Autowired KafkaProperties properties) {
        return new ReactiveKafkaConsumerTemplate<>(
            ReceiverOptions.<UUID, MyEvent>create(new HashMap<>(properties.buildConsumerProperties()))
                //==================================================
                .addAssignListener(partitions ->
                    partitions.forEach(ReceiverPartition::seekToEnd))
                //==================================================
                .subscription(List.of("MY-TOPIC")));
    }