Search code examples
apache-kafkaapache-kafka-streamsksqldbwindowed

Kafka Operations on Windowed KTables


I would like to do some further operations on a windowed KTable. To give some background, I have a topic with data in the form of: {clientId, txTimestamp, txAmount}. From this topic, I have created a stream, partitioned by clientId with the underlying topic timestamp equal to the txTimestamp event field. Starting from this stream, I want to aggregate the number of transactions per clientId in every 1 hour windows. This is done with something similar to the following: CREATE TABLE transactions_per_client WITH (kafka_topic='transactions_per_client_topic') AS SELECT clientId, COUNT(*) AS transactions_per_client, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM transactions_stream WINDOW TUMBLING (SIZE 1 HOURS) GROUP BY clientId;

The aggregations work as expected and yield values similar to:

ClientId Transactions_per_client windowsStart WindowEnd
1 12 1 2
2 8 1 2
1 24 2 3
1 19 3 4

What I want to do now is further process this table to add a column that represents the difference in number of transactions per client between 2 adjacent windows for the same client. For the previous table, that would be something like this:

ClientId Transactions_per_client windowsStart WindowEnd Deviation
1 12 1 2 0
2 8 1 2 0
1 24 2 3 12
1 19 3 4 -5

What would be the best way to achieve this (either using kafka streams or ksql)? I tried to use the User Defined Aggregation functions to try to create this column but it cannot be applied to a KTable, only to a KStream.


Solution

  • Just for future reference, the official answer at this time (April 2022) is that it cannot be done in kafka-streams through a DSL as "Windowed-TABLE are kind of a “dead end” in ksqlDB atm, and also for Kafka Streams, you cannot really use the DSL to further process the data" (answer on Confluent forum here: https://forum.confluent.io/t/aggregations-on-windowed-ktables/4340). The suggestion there is to use the Processor API, which indeed can be pretty straightforward to implement. At a high level pseudocode, it would be something like this:

    topology.addSource(NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY, 
    timeWindowedDeserializer, LongDeserializer, SOURCE_TOPIC -> the topic with the windowed KTable);
    
    topology.addProcessor(
                    NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY,
                    () -> new Aggregator(storeName),
                    NAME_OF_SOURCE_IN_THE_NEW_TOPOLOGY);
    StoreBuilder storeBuilder = keyValueStoreBuilder for the timeWindowedSerde and a Long serde for value;
    
    topology.addStateStore(storeBuilder, NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);
    
    topology.addSink(
                    NAME_OF_SINK_IN_THE_NEW_TOPOLOGY,
                    sinkTopic,
                    timeWindowedSerializer,
                    Serializer for the new structure -> POJO that contains the deviation field,
                    NAME_OF_PROCESSOR_IN_THE_NEW_TOPOLOGY);
    

    The aggregator in the previous section is a org.apache.kafka.streams.processor.api.Processor implementation that is keeping track of the values it has seen and is able to retrieve the previous seen value for a given key. Again, at a high level it would be something similar to this:

    Long previousTransactionAggregate = kvStore.get(previousWindow);
    
    long deviation;
    if (previousTransactionAggregate != null) {
          deviation = kafkaRecord.value() - previousTransactionAggregate;
    } else {
         deviation = 0L;
    }
    kvStore.put(kafkaRecord.key(), kafkaRecord.value());
    Record<Windowed<Long>, TransactionPerNumericKey> newRecord =
                    new Record<>(
                            kafkaRecord.key(),
                            new TransactionPerNumericKey(
                                    kafkaRecord.key().key(), kafkaRecord.value(), deviation),
                            kafkaRecord.timestamp());
    
    context.forward(newRecord);
    

    TransactionPerNumericKey in the previous section is the name of the structure for the enhanced windowed aggregation (containing the deviation value)