Im struggling on a relatively simple Windowed Word Count example. I'm trying to get only the windowed results but does not receive anything at all.
KStream<String, Long> sl = s
...
.groupBy((key, value) -> value)
.windowedBy(of(ofSeconds(5))
.advanceBy(ofSeconds(3))
.grace(ofSeconds(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"counts-store").withRetention(ofSeconds(7)))
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopicName, produced);
I'm piping in some input:
inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));
But nothing gets emitted. Someone knows a possible solution?
As you can see i'm already using grace and retention, as others wrote this could help but it's actually not helping. On commenting suppress line everything works.
You have to provide valid Serdes
for your count Materialized
view so Kafka Stream can correctly provide valid Window Serdes for internal suppress processor, if not then this processor will pick default key serdes which may lead to serialization not work correctly, I get following Exception in KTableSuppressProcessor.buffer()
:
//please check if you get this exception
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
Correctly provide valid Serde
for Materialized view counts-store
and you should get expected output:
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
.withRetention(ofSeconds(7))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())