If I want to perform a join between two KStream
s, e.g:
KStream<String, String> mergedStream = streamA.outerJoin(streamB, (value1,value2)->
{
value2 == null ? value1 : value2;
}
, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30)));
and I want the output to be sorted in the same order as streamA, how would I go about doing that? (assuming all topics have a single partition)
Here is an example of what I am trying to achieve:
Stream A: A,B,C,D,E,F
Stream B: F',E',D',B',A'
Merged Stream: A',B',C,D',E',F'
This may be complicated by Stream B using a producer which writes timestamps that may be in the past at the time of writing - so the write order might be different to the order when sorted by timestamp. If it makes a difference, Stream A should always be written in timestamp order.
Is there a rule around what the output sort order will be (or how timestamps are handled) from a merge operation; and if so, what is it? Or how can I choose to order the outputs based on Stream A? From the tests I've tried running, it seems to just use Stream B's sort order by default.
I wrote a custom stream processor, storing the messages then using a punctuator to sort them. This only sorts within a time window, but as long as you know your data it works well enough.
StoreBuilder<> storeBuilder = Stores.keyValueStoreBuilder(...)
builder.addStateStore(storeBuilder);
stream.process(()->new ContextualProcessor<String, String, String, String>() {
private KeyValueStore<String, ArrayList<KeyValue<String, Long>> store; // stores the key:value-and-timestamp
@Override
public void init(ProcessorContext<String, String> context) {
super.init(context);
store = (KeyValueStore<String,ArrayList<KeyValue<String, String>>)context.getStateStore("storeName");
context.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME, this::punctuate); // every 30 seconds, call punctuate
}
@Override
public void process(Record<String, String> record) {
store.put(record.key(), KeyValue.of(record.value(), record.timestamp());
}
void punctuate(long timestamp) {
// for each in store, sort by timestamp, then use context().forward(r);
}
});