I have a Kafka Streams processor that consumes three topics and tries to merge (Joining operation) them on the key. After joining successfully, it does some aggregation and then pushes results to the target topic. After the application runs for the first time, it tries to consume all data from those topics. Two of those topics use like lookup table, which means that I need to consume all data from the beginning. But one of those topics is my main topic. So I need to consume from the latest. But my application consumes all Kafka topics from the beginning. So I want to consume two topics from the start and one from the latest. I'm using Spring Cloud Stream, Kafka Streams Binder. Here are my configs and some code snippets;
Application.yaml :
spring.cloud.stream.function.definition: processName;
spring.cloud.stream.kafka.streams.binder.functions.processName.applicationId: myappId
spring.cloud.stream.bindings.processName-in-0.destination: mainTopic
spring.cloud.stream.bindings.processName-in-0.consumer.group: mainTopic-cg
spring.cloud.stream.bindings.processName-in-0.consumer.startOffset: latest
spring.cloud.stream.bindings.processName-in-1.destination: secondTopic
spring.cloud.stream.bindings.processName-in-1.consumer.group: secondTopic-cg
spring.cloud.stream.bindings.processName-in-1.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-in-2.destination: thirdTopic
spring.cloud.stream.bindings.processName-in-2.consumer.group: thirdTopic-cg
spring.cloud.stream.bindings.processName-in-2.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-out-0.destination: otputTopics
spring.cloud.stream.kafka.streams.binder.replication-factor: 1
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 10000
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: state-store
Streams processor:
public Function<KStream<String, MainTopic>,
Function<KTable<String, SecondTopic>,
Function<KTable<String, ThirdTopic>,
KStream<String, OutputTopic>>>> processName(){
return mainTopicKStream -> (
secondTopicTable -> (
thirdTopicKTable -> (
aggregateOperations.AggregateByAmount(
joinOperations.JoinSecondThirdTopic(mainTopicKStream ,secondTopicTable ,thirdTopicKTable )
.filter((k,v) -> v.IsOk() != 4)
.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.OutputTopic()))
, TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
).toStream()
)
));
}
A couple of points. When you have a Kafka Streams application using Spring Cloud Stream binder, you do not need to set group
information on the bindings, just your applicationId
setup is sufficient. Therefore, I suggest removing those 3 group
properties from your configuration. Another thing is that any consumer specific binding properties when using Kafka streams binder needs to be set under spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer...
. This is mentioned in this section of the docs. Please change your startOffset
configuration accordingly. Also, look at the same section of the docs for an explanation of the semantics for using startOffset
in Kafka Streams binder. Basically, the start offset
property is honored only when you start the application for the first time. By default it is earliest
when there are no committed offsets, but you can override to latest
using the property. You can materialize the incoming KTable
s as state stores and thus have access to all the lookup data.