Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-stream-binder-kafka

How to consume the latest records from the topic using Spring Cloud Stream?


I have a Kafka Streams processor that consumes three topics and tries to merge (Joining operation) them on the key. After joining successfully, it does some aggregation and then pushes results to the target topic. After the application runs for the first time, it tries to consume all data from those topics. Two of those topics use like lookup table, which means that I need to consume all data from the beginning. But one of those topics is my main topic. So I need to consume from the latest. But my application consumes all Kafka topics from the beginning. So I want to consume two topics from the start and one from the latest. I'm using Spring Cloud Stream, Kafka Streams Binder. Here are my configs and some code snippets;

Application.yaml :

spring.cloud.stream.function.definition: processName;
spring.cloud.stream.kafka.streams.binder.functions.processName.applicationId: myappId
spring.cloud.stream.bindings.processName-in-0.destination: mainTopic
spring.cloud.stream.bindings.processName-in-0.consumer.group: mainTopic-cg
spring.cloud.stream.bindings.processName-in-0.consumer.startOffset: latest
spring.cloud.stream.bindings.processName-in-1.destination: secondTopic
spring.cloud.stream.bindings.processName-in-1.consumer.group: secondTopic-cg
spring.cloud.stream.bindings.processName-in-1.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-in-2.destination: thirdTopic
spring.cloud.stream.bindings.processName-in-2.consumer.group: thirdTopic-cg
spring.cloud.stream.bindings.processName-in-2.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-out-0.destination: otputTopics

spring.cloud.stream.kafka.streams.binder.replication-factor: 1
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 10000
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: state-store

Streams processor:

public Function<KStream<String, MainTopic>,
        Function<KTable<String, SecondTopic>,
                Function<KTable<String, ThirdTopic>,
                        KStream<String, OutputTopic>>>> processName(){
    return mainTopicKStream -> (
            secondTopicTable -> (
                    thirdTopicKTable -> (
                            aggregateOperations.AggregateByAmount(
                                    joinOperations.JoinSecondThirdTopic(mainTopicKStream ,secondTopicTable ,thirdTopicKTable )
                                            .filter((k,v) -> v.IsOk() != 4) 
                                            .groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.OutputTopic()))
                                    , TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
                            ).toStream()
                    )
            ));
}

Solution

  • A couple of points. When you have a Kafka Streams application using Spring Cloud Stream binder, you do not need to set group information on the bindings, just your applicationId setup is sufficient. Therefore, I suggest removing those 3 group properties from your configuration. Another thing is that any consumer specific binding properties when using Kafka streams binder needs to be set under spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.... This is mentioned in this section of the docs. Please change your startOffset configuration accordingly. Also, look at the same section of the docs for an explanation of the semantics for using startOffset in Kafka Streams binder. Basically, the start offset property is honored only when you start the application for the first time. By default it is earliest when there are no committed offsets, but you can override to latest using the property. You can materialize the incoming KTables as state stores and thus have access to all the lookup data.