Search code examples
javaapache-kafkastreamapache-kafka-streams

Simultaneous operations on KStream & KTables


I'm trying to implement a use case in Kafka Streams where I populate a Ktable based on applying some filters on this stream, let's call this table a tracking table where the key is derived from the event and the value is the event. Now for subsequent events I check this table to verify if they're tracked and update the event if it's tracked or publish it to a different topic. I'm not sure how to do this simultaneously. Here's what I have so far.

// Branch based on conditions
KStream<String, Event>[] segregatedRecords = branches[0]
                       .branch((key, event) -> event.getStatus().getStatus().equals("A"),
                        (key, event) -> event.getStatus().getStatus().equals("B"),
                        (key, event) -> event.getStatus().getStatus().equals("C"),


// Store events with status A to a topic
segregatedRecords[0]
                .selectKey((key, event) -> createKey(event))
                .mapValues(transform)
                .to(intermediateTopic);

// Load topic from previous step as GlobalKtable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);

// The following step is where I'm stuck, because I can not perform conditional actions
// If the event exists in the tracking table (update) but if not then how to publish it to a different topic?
segregatedRecords[1]
                 // derive key for lookup
                .selectKey((key, event) -> createKey(event))
                // update the event status in the table 
                .join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
                ).to(intermediateTopic);

// Other events will need to refer the latest information in the tracked table for further processing 



Solution

  • You can do this by branching segregatedRecords[1] into 2 sub-Topology, one branch performs table lockup as your code, and the other branch uses low level processor API (using a transformValues in this case) to check whether the underlying GlobalKTable state store contains the record for the new derived key, if the record exists then transform the Event to null Event, then we filter out event which have null Event (because those events we already joined in the your first sub-Topology). I updated your code a little bit:

    //give your GlobalKTable a name to query later
    GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));
    
    KStream<String, Event> derivedKStream = segregatedRecords[1]
        // derive key for lookup
        .selectKey((key, event) -> createKey(event));
    // this sub-topology perform table lockup as normal: update the event status in the table
    derivedKStream.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent)
        .to(intermediateTopic);
    // this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined 
    // so we transform to null value and filter in next step 
    derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
        //get the underlying store of Tracked GlobalKTable
        KeyValueStore<String, Event> trackedKvStore;
        @Override
        public void init(ProcessorContext context) {
            //using the previous name
            trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
        }
    
        @Override
        public Event transform(String derivedKey, Event event) {
            //if event existed in trackedEvents then return a null event so we can filter out in next pipe
            if (trackedKvStore.get(derivedKey) != null) {
                return null;
            }
            //event not exist in trackedEvents, keep the event and send to different topic
            return event;
        }
    
        @Override
        public void close() {
        }
    })
    .filter((derivedKey, event) -> event != null)
    .to("your different toic name");
    

    Update : about the problem where you can not create both a GlobalKTable and a KStream from single topic intermediate (can not read a topic multiple time as described here):

    1. Create a dedicated input topic for GlobalKTable (this topic must have log compaction enabled):
    KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
    intermediateKStream.to(trackedInputTopic);
    //instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
    GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));
    
    //Perform things you want to do with the intermediate topic
    intermediateKStream
            ...