Search code examples
apache-kafkaapache-kafka-streams

How to lookup dynamically in a KTable?


I am currently developing a Kafka Streams app which enrich incoming events with data from our database. The enrichment data is stored in topics constantly updated with Debezium. Some enrichment are pretty simple to achieve because they are just an equi-join/left-join from event id. But other enrichment requires to compute a value from the incoming event timestamp:

Let's say I have my incoming event topic has this schema:

user_id: Long
timestamp: Instant

Then I need to map this event to the following output:

user_id: Long
has_planned_meetings_in_the_future: Boolean

Meetings table is stored in a separated topic, with following records structure:

user_id: Long
meeting_date: Instant

So for each events, I will need to lookup in meetings topic if they are records for this specific user AND with a meeting date greater than current timestamp.

How to do that?


Solution

  • One approach that would work is to consume the meetings topic in your application and store the meetings in a state store.

    You can then query the state store efficiently using the criteria you described.

    Here is a simple example to store meetings:

    public class MyMeetingsProcessor implements Processor<Object, Meeting> {
    
        private String meetingsKeyStore = "meetings-key-store";
        private KeyValueStore<Object, Meeting> meetings;
    
        public void init(ProcessorContext context) {
            meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
        }
    
        public void process(Object key, Meeting value) {
            meetings.put(key, value);
        }
    }
    

    To query the state store when consuming an event, you could do:

    public class MyEventsProcessor implements Processor<Object, Meeting> {
    
        private String meetingsKeyStore = "meetings-key-store";
        private KeyValueStore<Object, Meeting> meetings;
    
        public void init(ProcessorContext context) {
            meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
        }
    
        public void process(Object key, Event value) {
            Meeting meeting = meetings.get(key);
            if (meeting != null) {
                // do something fun
            }
        }
    }