Search code examples
javaspring-bootapache-kafkatimestampspring-kafka

How to seek Offset for Time Stamp with Spring for Kafka


We are using spring-kafka to consume messages that have to be forwarded to the frontend as server sent events (SSE).

When a user logs in she is to supposed to see all the events she missed out on since her last session.

Current implementation uses the ConsumerSeekCallback as described in this answer

However that callback does not support the offsetForTimes method of the underlying KafkaConsumer (KafkaConsumer#offsetForTimes).

So I have to use seekToBeginning and filter for the time stamp, which will cause problems when there are a lot of messages ...

Is there any other way to receive only the messages since a given time stamp? Maybe a safe way to use the consumer directly?


Solution

  • As Gary Russel pointed out above, ConsumerSeekCallback is legacy, so it's a no-go ... and I won't open a GitHub issue ...

    I was finally able to achieve my objective:

    When a user logs in she is to supposed to see all the events she missed out on since her last session.

    by handling all new subscriptions in an EventListener for the ListenerContainerIdleEvent, where the consumer is available as part of the event data:

        @EventListener(condition = "event.listenerId.startsWith('qux-')")
        public void idleEventHandler(ListenerContainerIdleEvent event) {
    
            // find new subscriptions
            Collection<EventListenerSubscription> newSubscriptions = 
                    subscriptions.stream().filter(s -> s.isNew())
                    .collect(Collectors.toList());
    
            if (!newSubscriptions.isEmpty()) {
    
                // mark subscriptions a not new
                newSubscriptions.forEach(s -> s.setNew(false));
    
                // compute the oldest time stamp
                OptionalLong oldestTimeStamp = 
                        newSubscriptions.stream()
                        .mapToLong(s -> s.getLastTimeStamp())
                        .reduce(Long::min);
    
                if (oldestTimeStamp.isPresent()) {
    
                    // seek on topic for oldest time stamp
                    Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                    timestampsToSearch.put(new TopicPartition(eventTopic, 0),
                                           oldestTimeStamp.getAsLong());
                    Consumer<?, ?> consumer = event.getConsumer();
                    event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
                        consumer.seek(k, v.offset());
                    });
                }
            }
        }
    

    I determine the oldest time stamp across all new subscriptions, mark these subscriptions as not new, and use the consumer seek back on the topic for the oldest time stamp.

    In order to get the container idle event, the idle interval has to be configured in the container properties, as described here.

    The KafkaListener will then take care of sending the old events to the (formerly new) subscribers:

        @KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
        public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {
    
            // collect the subscribers not marked as new
            Collection<EventListenerSubscription> oldSubscriptions = 
                    subscriptions.stream().filter(s -> !s.isNew())
                    .collect(Collectors.toList());
    
            for (EventListenerSubscription s : oldSubscriptions) {
                if (s.getLastTimeStamp() < timestamp) {
                    s.addMessage(event, timestamp);
                }
            }
        }