Search code examples
apache-kafka-streamsktable

Query KTable in the same Application where it is created


I have an Kafka streams application in which I read from a topic, do aggregation and materialize in a KTable. I then create a Stream and run some logic on the stream. Now in the stream processing, I want to use some data from the aforementioned KTable. Once I start the stream app, how do I get access to the KTable stream again? I don't want to push the KTable to a new Topic.

KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
            source.groupBy((key, value) -> value.getKey(),
                    Grouped.<String, MyClass >as("repartition-1")
                            .withKeySerde(new Serdes.String())
                            .withValueSerde(new MyClassSerDes()))
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
                            .withKeySerde(new Serdes.String())
                            .withValueSerde(Serdes.Long()));

Here I want to use data from the kTable.

inputstream.groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .count(Materialized.<myKey, Long, WindowStore<Bytes,   byte[]>>as("str")
    .withRetention(Duration.ofMinutes(30)))
    .toStream()
    .filter((k, v) -> { 
        // Here get the count for the previous Window.
        // Use that count for some computation here.
    }

Solution

  • You can add the KTable store to a processor/transformer. For you case, you can replace the filter with flatTransform (or any sibling like transform etc depending if you need access to the key) and connect the store to the operator:

    inputstream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
            .withRetention(Duration.ofMinutes(30))
        )
        .toStream()
        // requires v2.2; otherwise use `transform()`
        // if you don't need access to the key, consider to use `flatTransformValues` (v2.3)
        .flatTransform(
            () -> new Transformer<Windowed<myKey>,
                                  Long,
                                  List<KeyValue<Windowed<myKey>, Long>>() {
    
                private ReadOnlyWindowStore<myKey, Long> store;
    
                public void init(final ProcessorContext context) {
                    // get a handle on the store by its name
                    // as specified via `Materialized` above;
                    // should be read-only
                    store = (ReadOnlyWindowStore<myKey, Long>)context.getStateStore("str");
                }
    
                public List<KeyValue<Windowed<myKey>, Long>> transform(Windowed<myKey> key,
                                                                       Long value) {
    
                  // access `store` as you wish to make a filtering decision
    
                  if ( ... ) {
                      // record passes
                      return Collection.singletonList(KeyValue.pair(key, value));
                  } else {
                      // drop record
                      return Collection.emptyList();
                  }
                }
    
                public void close() {} // nothing to do
            },
            "str" // connect the KTable store to the transformer using its name
                  // as specified via `Materialized` above
        );