Search code examples
apache-kafka-streams

KStreamWindowAggregate seems to share streamtime causing windows to expire


Messages discarded due to expired windows even though for that particular key the window should not be closed

I want to group messages consumed from a single partition topic and window these messages by 30 seconds based on the eventtime. In order to avoid immediate processing I invoke the suppress method and also use the .grace method. Once the windows get closed (after 30 seconds + grace period of 0) I expect the final result to be added to a topic. The messages I consume from the topic have two different keys: 300483976 and 300485339. The messages I consume increase the eventtime by 10 seconds. I read that the streamtime is only increased based on new messages coming in that increase the eventtime. This is also what I experience. However the issue I'm seeing is the following:

I consume the first 10 messages for key 300483976. Based on the method "KStreamWindowAggregate.process" I notice that the internalProcessorContext.streamTime() does increase every time, based on the latest consumed message. After processing the 10 messages the final eventtime is now starttime + 300 seconds. After that moment the messages for key 300485339 are consumed. All, but the latest messages are marked as expired and discarded with the message "Skipping record for expired window.". It seems that the internalProcessorContext.streamTime() still remembers the latest value of the first run and therefore discards the messages with key 300485339.

stream
                .groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
                .windowedBy(
                        TimeWindows.of(Duration.ofSeconds(30))
                                .grace(Duration.ofMillis(0))) // override the default of 24 hours
                .aggregate(Data::new, transform(), materialize())
                .filter((key, value) -> {
                    log.info("agg {} {}", key, value.toString());
                    return true;
                })
                .suppress(
                        Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream();

I would expect that as the messages are grouped by key (300483976 and 300485339) the streamtime would not be "shared". I would expect that there will be separate windows for key 300483976 and key 300485339. Any idea what is wrong? I'm using kafka-streams 2.1.0 and a timestampextractor that gets the eventtime from a field in the message.

UPDATE

I did some additional testing and adapted an example that doesn't use aggregate, but does show the same issue with the stream times:

    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        final StreamsBuilder builder = new StreamsBuilder();
        final KTable<Windowed<String>, Long> valueCounts = builder
                .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
                .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
                .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
        valueCounts
                .suppress(untilWindowCloses(unbounded()))
                .toStream()
                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
                .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        valueCounts
                .toStream()
                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
                .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final ConsumerRecordFactory<String, String> recordFactory =
                new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
            driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
            // note this last records sets the streamtime to 7L causing the next messages to be discarded
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
        }
    }

In the above example the second message sets the streamtime to 7L making the created window of 0 to 2 get closed even though the message has a different key. This als causes the next couple of messages to be discarded even while there key is k1. So from this example it becomes clear that the keys are not taken into account. If this is actually how it is designed, I'm wondering what the scenario is for this. Especially when I think it is quite common that a topic has messages with different partitions and one parition might have completely different messages with streamtimes (originated from eventtime) from other paritions. Hope you can shed some light on this??


Solution

  • The observed behavior is by design. Obviously, stream-time is tracked across all messages (it's not substream-time).

    The "problem" you see is, that your input data is out-of-order (just putting key and ts):

    (k1, 1), (k1, 2), (k1, 3), (k2, 1), (k2, 2), (k3, 3)
    

    Time does not monotonically increase, ie, records with key k2 are out-of-order with respect to records with key k1. Because you set grace period to zero, you tell Kafka Streams to no allow for unordered data (or actually only some out-of-order data within the window). Hence, the result would only be as you expect it, for an ordered data stream with interleaved keys but monotonically increasing timestamps):

    (k1, 1), (k2, 1), (k1, 2), (k2, 2), (k1, 3), (k3, 3)
    

    If you have out-of-order data, you should set the grace period accordingly high (zero only works for ordered data stream).