Search code examples
apache-kafka-streams

Kafka streams suppress does not return any values


Im struggling on a relatively simple Windowed Word Count example. I'm trying to get only the windowed results but does not receive anything at all.

KStream<String, Long> sl = s
    ...
    .groupBy((key, value) -> value)
    .windowedBy(of(ofSeconds(5))
        .advanceBy(ofSeconds(3))
        .grace(ofSeconds(2)))
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
        "counts-store").withRetention(ofSeconds(7)))
    .suppress(untilWindowCloses(unbounded()))
    .toStream()
    .map((key, value) -> new KeyValue<>(key.key(), value))
    .to(outputTopicName, produced);

I'm piping in some input:

inputWords.pipeInput(new TestRecord<>("word", "a b c", now));
inputWords.pipeInput(new TestRecord<>("word", "a c d c", now.plus(ofSeconds(6))));
inputWords.pipeInput(new TestRecord<>("word", "", now.plus(Duration.ofDays(1))));

But nothing gets emitted. Someone knows a possible solution?

As you can see i'm already using grace and retention, as others wrote this could help but it's actually not helping. On commenting suppress line everything works.


Solution

  • You have to provide valid Serdes for your count Materialized view so Kafka Stream can correctly provide valid Window Serdes for internal suppress processor, if not then this processor will pick default key serdes which may lead to serialization not work correctly, I get following Exception in KTableSuppressProcessor.buffer():

    //please check if you get this exception
    java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
    
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
        at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
    

    Correctly provide valid Serde for Materialized view counts-store and you should get expected output:

    Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store")
        .withRetention(ofSeconds(7))
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long())