Search code examples
apache-kafkaksqldbktable

How do you get the latest offset from a remote query to a Table in ksqlDB?


I have an architecture where I would like to query a ksqlDB Table from a Kafka stream A (created by ksqlDB). On startup, Service A will load in all the data from this table into a hashmap, and then afterward it will start consuming from Kafka Stream A and act off any events to update this hashmap. I want to avoid any race condition in which I would miss any events that were propagated to Kafka Stream A in the time between I queried the table, and when I started consuming off Kafka Stream A. Is there a way that I can retrieve the latest offset that my query to the table is populated by so that I can use that offset to start consuming from Kafka Stream A?

enter image description here

Another thing to mention is that we have hundreds of instances of our app going up and down so reading directly off the Kafka stream is not an option. Reading an entire stream worth of data every time our apps come up is not a scalable solution. Reading in the event streams data into a hashmap on the service is a hard requirement. This is why the ksqlDB table seems like a good option since we can get the latest state of data in the format needed and then just update based off of events from the stream. Kafka Stream A is essentially a CDC stream off of a MySQL table that has been enriched with other data.


Solution

  • You used "materialized view" but I'm going to pretend I heard "table". I have often used materialized views in a historical reporting context, but not with live updates. I assume that yours will behave similar to a "table".

    I assume that all events, and DB rows, have timestamps. Hopefully they are "mostly monotonic", so applying a small safety window lets us efficiently process just the relevant recent ones.


    The crux of the matter is racing updates. We need to prohibit races.

    Each time an instance of a writer, such as your app, comes up, assign it a new name. Rolling a guid is often the most convenient way to do that, or perhaps prepend it with a timestamp if sort order matters.

    Ensure that each DB row mentions that "owning" name.

    want to avoid any race condition in which I would miss any events that were propagated to Kafka Stream A in the time between I queried the materialized view, and when I started consuming off Kafka Stream A.

    We will need a guaranteed monotonic column with an integer ID or a timestamp. Let's call it ts.

    1. Query m = max(ts).
    2. Do a big query of records < m, slowly filling your hashmap.
    3. Start consuming Stream A.
    4. Do a small query of records >= m, updating the hashmap.
    5. Continue to loop through subsequently arriving Stream A records.

    Now you're caught up, and can maintain the hashmap in sync with DB.

    Your business logic probably requires that you treat DB rows mentioning the "self" guid in a different way from rows that existed prior to startup. Think of it as de-dup, or ignoring replayed rows.


    You may find offsetsForTimes() useful.

    There's also listOffsets().