apache-kafkastreamapache-kafka-streams

Is the processing order of a Kafka Streams topology specified?


I want to know if the order in which a message is processed by a stream topology is specified.

Example:

        // read input messages

        KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
        inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));

        // check if message was already processed

        KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
        KStream<String, String> newMessages =
                inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
        KStream<String, String> filteredNewMessages =
                newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));

        // process the message

        filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
                .peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");

With getMessageValueOrNullIfKnownMessage(...):

    private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
        if (messageCounter > 1) {
            return null;
        }

        return newMessageValue;
    }

So there is only one input and one output topic in the example.

The input topic gets counted (thus a local state is created) in alreadyProcessedMessages. Also, the input topic gets joined with the counting table alreadyProcessedMessages and the result of the join is the stream newMessages (the value of the messages in this stream are null if the message count is > 1, otherwise its the original value of the message).

Then, the messages of newMessages get filtered (the null values are filtered out) and the result is written to an output topic.

So what this minimal stream does: It writes all messages from the input topic to the output topic which have a new key (a key that has not been processed before).

In tests that stream works. But I think that is not guaranteed. It only works, because a message is processed first by the counting node before it gets joined.

But is there a guarantee for that order?

As far as I can see in all the documentation, there is no guarantee for this processing order. So if a new message arrive, this could also happen:

  • The message is processed by the "join node".
  • The message is processed by the "counting node".

This would produce a different result of course (so in this case, if a message with the same key comes in the second time it would still be joined with the original value, since it has not been counted yet).

So is the order of processing specified somewhere?

I know that in new versions of Kafka, a KStream-KTable join is done based on the timestamps of the messages in the input partitions. But this does not help here, because the topology uses the same input partition (because its the same message).

Thank you


Solution

  • There is no guarantee. Even if in the current implementation, a List of child nodes is used: https://github.com/apache/kafka/blob/3.6/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L267-L269 -- However, it's not guaranteed that the child nodes are appended to this list in the same order as they are specified in the DSL (because there is a translation layer in between that may add the nodes in a different ordered). Also, the implementation may change at any point in time.

    The only workaround (that is rather expensive) I can think of that might work is, to send the stream-side data in a repartiton topic:

    KStream<String, String> newMessages =
       inputMessages.through(...) // note: as of 2.6.0 release, you could use `repartition()` instead of `through()`
                    .leftJoin(alreadyProcessedMessages, ...);
    

    This way, the KTable will be updated before the join is executed, as the record would need to be read back first. However, as you don't have any guarantee when the record is read back, there might be multiple updates to the table before the join is done what might leave you in a similar situation as before. (Also, its somewhat expensive to reroute the data through an additional topic.)

    Using the Processor API, you would have move control, as you can call context.forward(..., To.child(...)). However, for this case you would also need to implement the aggregation and join manually:

    KStream routing = inputMessages.transform(...);
    routing.groupByKey(...);
    routing.leftJoin(...);
    

    For this case, you get repartition topics after transform() that you want to avoid:

    KStream routing = inputMessages.transform(...);
    routing.transform(...); // implement the aggregation
    routing.transform(...); // implement the join
    

    A consecutive transform() would not trigger an auto-repartitioning.