Search code examples
scalaapache-kafkaapache-kafka-streams

Kafka streams: join on ingestion time


I have two topics of fairly varied volumetry (could be something like 1000 events emitted in the left topic for every event in the right topic).

I'm trying to leftJoin those two topics together and I'm having the impression that the join window is computed over processing time and not ingestion time, causing the smaller stream to "run out" way too soon.

Is it possible to specify the time semantics of a stream-stream join to ingestion time (or event time)?

I could see why it's not such an easy thing to use ingestion time but it seems to be a necessity when processing historical streams.


Solution

  • Kafka Streams join is based on event-time, i.e., whatever TimestampExtractor returns (by default the message timestamp as stored in the topic) and you cannot modify it (you can only use a different timestamp extractor to indirectly modify the result).

    Note though, that the join is executed "eagerly", and thus for all left side records, the lookup into the right stream is done immediately, what can lead to additional <key, (left-value,null)> results.

    It also depends on the processing order that is base on event-time in a best effort manner. The guarantees got improve in the 2.3 release and using config parameter max.task.idle.ms might help to mitigate the issue.

    It's on the mid-term roadmap to improve left- and outer-join behavior to avoid those additional result records. As Kafka is an open-source projects and you would like to pick it up, please help to get this fixed sooner :)

    The only other alternative would be to implement a custom join operator via the Processor API.