Search code examples
ksqldb

How to CREATE TABLE AS SELECT on a stream?


CREATE TABLE AS SELECT can be done only with a grouping, but how then in the Base to organize a table (you need a simple table without aggregations / groupings) in order to supplement another stream with data? As an example: There is a stream A with fields (product, city_id). We need a table (or something else) with fields (city_id, city_name) which is replenished by another thread.

And there is a stream that connects complements stream A with the name from the table through.

How can you organize data enrichment using an external directory?


Solution

  • You can use the LATEST_BY_OFFSET aggregation to build a table of data in this way.

    CREATE STREAM source_city_data 
      WITH (KAFKA_TOPIC='source_city_data', FORMAT='AVRO');
    
    CREATE TABLE city_data AS 
      SELECT city_id, LATEST_BY_OFFSET(city_name) AS city_name 
      FROM source_city_data
      GROUP BY city_id;