Search code examples
ksqldb

How can I implement a windowed rolling average with KSQL KTable correctly?


I am trying to implement a volume rolling average into KSQL.

Kafka currently ingests data from a producer into the topic "KLINES". This data is across multiple markets with a consistent format. I then create a stream from that data like so:

CREATE STREAM KLINESTREAM (market VARCHAR, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume DOUBLE, start_time BIGINT, close_time BIGINT, event_time BIGINT) \
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='KLINES', TIMESTAMP='event_time', KEY='market');

I then create a table which calculates the average volume over the last 20 minutes for each market like so:

CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS \
SELECT CEIL(SUM(volume) / COUNT(*)) AS volume_avg, market FROM KLINESTREAM \
WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS) \ 
GROUP BY market;
SELECT * FROM AVERAGE_VOLUME_TABLE_BY_MARKET LIMIT 1;

For clarity, produces:

1560647412620 | EXAMPLEMARKET : Window{start=1560647410000 end=-} | 44.0 | EXAMPLEMARKET

What I wish to have is a KSQL Table that will represent the current "kline" state of each market while also including that rolling average volume calculated in "AVERAGE_VOLUME_TABLE_BY_MARKET" KTable so I can perform analysis between current volume and the average rolling volume

I have tried to join like so:

SELECT K.market, K.open, K.high, K.low, K.close, K.volume, V.volume_avg \
FROM KLINESTREAM K \
LEFT JOIN AVERAGE_VOLUME_TABLE_BY_MARKET V \
ON K.market = V.market;

But obviously this results in an error as the "AVERAGE_VOLUME_TABLE_BY_MARKET" key includes the TimeWindow and also the market.

A serializer (key:
    org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to
    the actual key type (key type: java.lang.String). Change the default Serdes in
    StreamConfig or provide correct Serdes via method parameters.

Am I approaching this problem correctly?

What I want to achieve is:

Windowed Aggregate KTable + Kline Stream -> 
KTable representing current market state 
including average volume from the KTable

which displays the current market state possible in KSQL. Or must I use KStreams or another library to accomplish this?

A great aggregation example is here: https://www.confluent.io/stream-processing-cookbook/ksql-recipes/aggregating-data

Applicable to this example, how would I use the aggregate to compare to fresh data as it arrives in the KSQL Table?

Cheers, James


Solution

  • I believe what you're looking for may be LATEST_BY_OFFSET:

    CREATE TABLE AVERAGE_VOLUME_TABLE_BY_MARKET AS
      SELECT 
        market,
        LATEST_BY_OFFSET(volume) AS volume,
        CEIL(SUM(volume) / COUNT(*)) AS volume_avg 
      FROM KLINESTREAM
        WINDOW HOPPING (SIZE 20 MINUTES, ADVANCE BY 5 SECONDS)
      GROUP BY market;