Search code examples
sortingjoinapache-kafkaapache-kafka-streams

Sort order for KStream join


If I want to perform a join between two KStreams, e.g:

KStream<String, String> mergedStream = streamA.outerJoin(streamB, (value1,value2)->
{
    value2 == null ? value1 : value2; 
}
, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)));

and I want the output to be sorted in the same order as streamA, how would I go about doing that? (assuming all topics have a single partition)

Here is an example of what I am trying to achieve:

Stream A: A,B,C,D,E,F

Stream B: F',E',D',B',A'

Merged Stream: A',B',C,D',E',F'

This may be complicated by Stream B using a producer which writes timestamps that may be in the past at the time of writing - so the write order might be different to the order when sorted by timestamp. If it makes a difference, Stream A should always be written in timestamp order.

Is there a rule around what the output sort order will be (or how timestamps are handled) from a merge operation; and if so, what is it? Or how can I choose to order the outputs based on Stream A? From the tests I've tried running, it seems to just use Stream B's sort order by default.


Solution

  • I wrote a custom stream processor, storing the messages then using a punctuator to sort them. This only sorts within a time window, but as long as you know your data it works well enough.

    StoreBuilder<> storeBuilder = Stores.keyValueStoreBuilder(...)
    builder.addStateStore(storeBuilder);
    stream.process(()->new ContextualProcessor<String, String, String, String>() {
        private KeyValueStore<String, ArrayList<KeyValue<String, Long>> store; // stores the key:value-and-timestamp
    
        @Override
        public void init(ProcessorContext<String, String> context) {
            super.init(context);
            store = (KeyValueStore<String,ArrayList<KeyValue<String, String>>)context.getStateStore("storeName");
            context.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::punctuate); // every 30 seconds, call punctuate
        }
        @Override
        public void process(Record<String, String> record) {
            store.put(record.key(), KeyValue.of(record.value(), record.timestamp());
        }
    
        void punctuate(long timestamp) {
            // for each in store, sort by timestamp, then use context().forward(r);
        }
    
    });