I'm developing a Quarkus microservice that utilizes Kafka Streams to process messages from multiple topics. Specifically, I'm attempting to join a KStream and a KTable derived from two of these topics. Under normal conditions, the join operates as expected. However, under increased load, the join occasionally fails to produce results, even though both messages appear to be processed, but the ones from the KTable seems to be processed slightly after the ones of the KStream(even if the actual timestamp of the produced message is before).
Code Example:
KStream<String, OperationAvro> operationKStream = streamsBuilder.stream(operationTopic);
KStream<String, ContextAvro> contextKStream = streamsBuilder.stream(contextTopic);
KTable<String, ContextAvro> contextKTable = contextKStream.toTable(Materialized.as(contextStoreName));
// both KTable and KStream use a custom processor which logs each message and its timestamp
KStream<String, Result> processingResultStream = operationKStream
.filter((key, value) -> isEligible(value))
.join(
contextKTable,
(operation, context) -> topicsJoiner(operation, context),
Joined.with(Serdes.String(), new ValueAndTraceSerde<>(getSpecificAvroSerde()), getSpecificAvroSerde())
)
.peek(this::logTheOutputOfJoiner)
.mapValues(handler::processTheJoinedData);
Issue:
Troubleshooting Steps Taken:
My question is why this issue happen as the timestamp of the messages seems to be in correct order, but it happen for the messages from the KStream to be processed before the ones of KTable, which make the join to be skipped.
Finally, I got it :)
The problem is related to timestamp of a resulting message in the topology.
Setup Overview
The project follows a sequential message processing architecture where services emit and process messages in a chain. Each service joins its input message with a context message emitted by the first service in the chain. For example:
Service A emits messageA (context message).
Service B reads messageA, processes it, and emits messageB.
Service C reads messageB, joins it with messageA, and emits messageC.
Service D reads messageC, joins it with messageA, and emits messageD.
And so on...
The Issue
Under normal load(when the services are not busy), messages are processed almost immediately after they are emitted. In this case, the topology processes messageA first, followed by messageB, messageC, and so on. This ensures that the joins with the context (messageA) happen as expected.
However, under high load, Kafka Streams processes messages based on their timestamps, and the following issue occurs:
Resulting messages (messageB, messageC, messageD) are emitted with the same timestamp as messageA. When there are many messages to process, Kafka Streams priorities processing by timestamp. As a result, the order of processing becomes unpredictable:
Sometimes messageA is processed first, enabling the joins in downstream services.
Other times, messageB, messageC, or messageD are processed first, causing joins to fail because messageA is not yet available.
Fix
The fix consists in altering each resulting messages's timestamp with the current timestamp:
abstract class TimestampUpdaterProcessor<T> extends ContextualFixedKeyProcessor<String, T, T> {
public abstract void processRecord(FixedKeyRecord<String, T> kafkaRecord);
@Override
public final void process(FixedKeyRecord<String, T> kafkaRecord) {
processRecord(kafkaRecord);
context().forward(withUpdatedTimestamp(kafkaRecord));
}
protected FixedKeyRecord<String, T> withUpdatedTimestamp(FixedKeyRecord<String, T> kafkaRecord) {
return Optional.ofNullable(kafkaRecord)
.map(this::updateTimestamp)
.orElse(null);
}
private FixedKeyRecord<String, T> updateTimestamp(FixedKeyRecord<String, T> kafkaRecord) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
long originalTimestamp = kafkaRecord.timestamp();
String originalFormattedTime = sdf.format(new Date(originalTimestamp));
long currentTimestamp = System.currentTimeMillis();
String currentFormattedTime = sdf.format(new Date(currentTimestamp));
log.trace("Updating timestamp for record with key: {}, originalTimestamp: {}, currentTimestamp: {}, value: {}",
kafkaRecord.key(), originalFormattedTime, currentFormattedTime, kafkaRecord.value() != null ? kafkaRecord.value().toString() : "null");
return kafkaRecord.withTimestamp(currentTimestamp);
}
}