Search code examples
spring-integrationkafka-consumer-apispring-integration-dsl

Spring Integration to read message from Kafka topic based on timestamp


While using spring kafka I am able to read the messages from the topic based on time stamp with the below code -

                ConsumerRecords<String, String> records = consumer.poll(100);
                if (flag) {
                    Map<TopicPartition, Long> query = new HashMap<>();
                    query.put(new TopicPartition(kafkaTopic, 0), millisecondsFromEpochToReplay);

                    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
                    if(result != null)
                    {
                        records = ConsumerRecords.empty();
                    }

                    result.entrySet().stream()
                            .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));

                    flag = false;
                }

How can the same functionality be achieved using spring integration DSL - with KafkaMessageDrivenChannelAdapter? How can we set the Integration Flows and read message from topic based on the timestamp?


Solution

  • Configure the adapter's listener container with a ConsumerAwareRebalanceListener and perform the lookup/seeks when the partitions are assigned.

    EDIT

    Using Spring Boot (but you can configure the container however you create the container)...

    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.group-id=so54664761
    

    and

    @SpringBootApplication
    public class So54664761Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54664761Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> template.send("so54664761", "foo");
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so54664761", 1, (short) 1);
        }
    
        @Bean
        public IntegrationFlow flow(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
            ConcurrentMessageListenerContainer<String, String> container = container(containerFactory);
            return IntegrationFlows.from(new KafkaMessageDrivenChannelAdapter<>(container))
                    .handle(System.out::println)
                    .get();
        }
    
        @Bean
        public ConcurrentMessageListenerContainer<String, String> container(
                ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
    
            ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("so54664761");
            container.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
    
                @Override
                public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                    System.out.println("Partitions assigned - do the lookup/seeks here");
                }
    
            });
            return container;
        }
    
    }
    

    and

    Partitions assigned - do the lookup/seeks here
    GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f5b2297, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so54664761, kafka_receivedTimestamp=1550241100112}]