I'm performing a message enrichement through a KStream
-KTable
left join using the kafka-streams DSL. Everything worked smoothly except for a subtle problem.
In the current architecture we receive in a topic (placements
, the KStream) some messages that needs to be enriched with the data from a compacted topic (descriptions
, the KTable). The messages are something like:
{
"order_id": 123456789,
"user_id": 987654,
"placed_at": "2020-07-20T11:31:00",
"amount": 5.79,
"items" : [
{"item_id": 13579, "quantity": 1, "price": 1.23},
{"item_id": 24680, "quantity": 1, "price": 4.56}
]
}
My current approach is to take the incoming message from placements
, splitting it in N messages (with N being the length of the items
array), performing the left join on item_id
to add the item description and then group the resulting stream over the order_id
(the key of the enriched splitted messages) to reconstruct the full message.
The problem is that the descriptions may arrive with some seconds of the delay, so in some rare occasions I get a reconstructed message with less items than the original unenriched.
I've seen the approach in the custom join example. It is quite good for my case, but unfortunately it doesn't completely fit. Indeed, in my case, if the description of a single item is missing, the complete message should be delayed. Currently I'm not able to figure out how to proceed in this situation. Any suggestion is welcomed.
After careful analysis of the custom join example, the solution is to slightly change its logic.
Below an excerpt from the example:
private static final class StreamTableJoinStreamSideLogic
implements TransformerSupplier<String, Double, KeyValue<String, Pair<Double, Long>>> {
/* ... */
private KeyValue<String, Pair<Double, Long>> sendFullJoinRecordOrWaitForTableSide(final String key,
final Double value,
final long streamRecordTimestamp) {
final ValueAndTimestamp<Long> tableValue = tableStore.get(key);
if (tableValue != null &&
withinAcceptableBounds(Instant.ofEpochMilli(tableValue.timestamp()), Instant.ofEpochMilli(streamRecordTimestamp))) {
final KeyValue<String, Pair<Double, Long>> joinRecord = KeyValue.pair(key, new Pair<>(value, tableValue.value()));
LOG.info("Table data available for key {}, sending fully populated join message {}", key, joinRecord);
return joinRecord;
} else {
LOG.info("Table data unavailable for key {}, sending the join result as null", key);
return KeyValue.pair(key, new Pair<>(value, null));
}
}
/* ... */
}
In particular, the method sendFullJoinRecordOrWaitForTableSide()
needs to be modified in order apply the same logic to items
in a all-or-none fashion.