Search code examples
apache-kafkaapache-kafka-streams

Event-Time merge of two Kafka topics using Kafka Streams DSL


I am looking for a way to merge two Kafka topics based on the event time.

for example, I have two topics with the following schema {event-key}:: {event-time-as-value}

topic I -  { {1 :: 12:00pm} {2 :: 12:10pm} {3 :: 14:50pm} {4 :: 15:00pm} }
topic II - { {1 :: 13:00pm} {2 :: 13:10pm} {3 :: 15:50pm} {4 :: 16:00pm} }

The expected output should look like this:

{ {1 :: 12:00pm} {2 :: 12:10pm} {1 :: 13:00pm} {2 :: 13:10pm} {3 :: 14:50pm} {4 :: 15:00pm} {3 :: 15:50pm} {4 :: 16:00pm} }

Is there a way to do it using Kafka Streams DSL?

A Note: There is a good chance that the original topics are not ordered by event-time, and it's ok. I would like the algorithm to always pick the earliest of the two events that are currently at the head of each topic (same as the way the merge two sorted arrays algorithm works)


Solution

  • Kafka Streams (as of version 2.1.0) implements the exact algorithm you describe. Hence, a simple:

    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(Arrays.asList("firstInputTopic", "secondInputTopic"))
        .to("outputTopidName");
    

    should do what you want. Note that the program will merge data on a per-partition bases.

    Also consider configuration max.task.idle.ms.

    For more details read the corresponding KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

    Additionally, you need to implement and configure a custom TimestampExtractor that gets the timestamp from the value.