I'm trying to monitor in my Kafka Streams topology the processing time of the messages coming from an specific input topic. For this purpose, I've added at the end of my topology a transformer which accesses the context timestamp of the messages and registers the elapsed time. This works perfectly for the simple linear/single-input-topic case.
However, let's assume I have the following topology:
And let's suppose I want to monitor A -> Out path but not B -> Out. The obstacle I'm facing is that, as I have gone through a join, the context topic the transformer sees is neither Topic A nor Topic B but the internal topic of the join, so I can not use this context information to distinguish. Therefore the question is:
Which is the proper way to flag messages from Topic A?
One possibility is to add a custom header to the message context. In the specific case of the proposed example we could add a transformer after consuming from Topic A, adding some specific header which would be the basis for the filtering condition on the monitoring transformer after the join.
Another possibility would be to flag the message in the message content itself. In this case the transformer after consuming from Topic A would extend the message value with a boolean flag which would have to be propagated to the monitoring transformer.
Both options have pros and cons and should be considered depending on the specific use case. Flagging via context headers has a minimum impact but requires deserializing the headers for evaluating the flag-based filtering condition while flagging the message content itself makes the condition simpler to be evaluated but may require some refactoring/adaptation at other points of the topology.