Search code examples
javaapache-kafkaapache-kafka-streams

Wait for store to load when using KStream.transform


The problem I want to solve starts as a basic one: I want to enrich some stream message with data available in some other stream. When I start the application I want to make sure the secondary stream is loaded so I don't start reading from some empty storage. A KTable solves this problem when I join my stream with it as there is a correlation/sync between timestamps of both sources.

The problem is that I need access to the message headers which I don't get in a join operation. So it seems to me I cannot use a join but need to use a different operation, like a transform. There I can get the headers and inspect a store. This store is fed by the secondary topic (that I was previously using to feed the KTable).

The problem now: how do I wait for the store to load before processing new messages? Or, in other words, since there is no explicit relationship between the two topics (like there was when using a KTable), if I start the application for the first time, when I receive a first message in the primary stream and try to find correlated data in the store (fed by the secondary stream) I will find nothing. How do I avoid this problem?


Solution

  • I guess you know that you can do anything you want, like waiting and checking for timestamps, if you use a lower level API (like consumer/producer).

    With a higher level API, join is indeed what I know to force such synchronization between streams and, using a transform that accesses the store removes this possibility, as you described.

    What you could do is not do everything in a single transform. Instead, transform your stream by just enriching the message with explicit fields for the headers you need. Then apply a join with the KTable you mentioned (where you can access all headers because they are now part of the message).