The problem I want to solve starts as a basic one: I want to enrich some stream message with data available in some other stream. When I start the application I want to make sure the secondary stream is loaded so I don't start reading from some empty storage. A KTable
solves this problem when I join
my stream with it as there is a correlation/sync between timestamps of both sources.
The problem is that I need access to the message headers which I don't get in a join
operation. So it seems to me I cannot use a join
but need to use a different operation, like a transform
. There I can get the headers and inspect a store. This store is fed by the secondary topic (that I was previously using to feed the KTable
).
The problem now: how do I wait for the store to load before processing new messages? Or, in other words, since there is no explicit relationship between the two topics (like there was when using a KTable
), if I start the application for the first time, when I receive a first message in the primary stream and try to find correlated data in the store (fed by the secondary stream) I will find nothing. How do I avoid this problem?
I guess you know that you can do anything you want, like waiting and checking for timestamps, if you use a lower level API (like consumer/producer).
With a higher level API, join
is indeed what I know to force such synchronization between streams and, using a transform
that accesses the store removes this possibility, as you described.
What you could do is not do everything in a single transform
. Instead, transform
your stream by just enriching the message with explicit fields for the headers you need. Then apply a join
with the KTable
you mentioned (where you can access all headers because they are now part of the message).