CREATE TABLE AS SELECT can be done only with a grouping, but how then in the Base to organize a table (you need a simple table without aggregations / groupings) in order to supplement another stream with data? As an example: There is a stream A with fields (product, city_id). We need a table (or something else) with fields (city_id, city_name) which is replenished by another thread.
And there is a stream that connects complements stream A with the name from the table through.
How can you organize data enrichment using an external directory?
You can use the LATEST_BY_OFFSET
aggregation to build a table of data in this way.
CREATE STREAM source_city_data
WITH (KAFKA_TOPIC='source_city_data', FORMAT='AVRO');
CREATE TABLE city_data AS
SELECT city_id, LATEST_BY_OFFSET(city_name) AS city_name
FROM source_city_data
GROUP BY city_id;