Search code examples
javaapache-kafkakafka-consumer-apiapache-kafka-streamskafka-producer-api

Kafka Streams API: Session Window incompatible types


I have the following snippet:

groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable<byte[], byte[]> mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -> {
              try {
                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn("Couldn't aggregate key grouped stream\n", e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));

I am getting the following compilation exception:

Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>

I know that if I inline the windowedBy like so:

        KTable<Windowed<byte[]>, byte[]> mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -> {
                            try {
                                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn("Couldn't aggregate key grouped stream\n", e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));

It works, but I am not sure how to separate and split those two...


Solution

  • there are two issues here.

    The first issue is that KGroupedStream.windowedBy(SessionWindows) returns an instance of a SessionWindowedKStream<K, V> and in your first example

    groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

    You are not capturing the returned SessionWindowedKStream in a variable.

    The second issue is in your first code example you have

    KTable<byte[], byte[]> mergedTable

    when it should be

    KTable<Windowed<byte[]>, byte[]> mergedTable

    as it is in your second example.

    If you change the code to

    SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
    
    KTable<Windowed<byte[]>, byte[]> mergedTable = 
          sessionWindowedKStream
                    .reduce((aggregateValue, newValue) -> {...
    

    Then it should compile fine.

    HTH Bill