Search code examples
apache-kafkaksqldb

Create a table from a topic using a where clause using ksql


I'm using the latest version of Kafka sql server 0.29.2, I guess. I'm trying to create a reading table that reads from a specific topic which receives lots of events, but I'm interested in specific events. The JSON event has a property named "evenType", so I want to continually filter the events and create a specific table to store the client data, like phone number, email etc., to update the client info.

I created a stream called orders_inputs only for testing purposes, and then I tried to create this table, but I got that error.

 create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
line 1:120: mismatched input 'as' expecting ';'
Statement: create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
Caused by: line 1:120: mismatched input 'as' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException

Solution

  • If you are wanting to create a table that contains the results of a select query from a stream you can use CREATE TABLE AS SELECT

    https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-ksql-table-with-streaming-query-results

    e.g.

    CREATE TABLE orders AS 
    SELECT orderid, itemid FROM orders_inputs 
    WHERE type='t1';
    

    You can specify the primary key when creating the stream order_inputs: https://docs.confluent.io/5.4.4/ksql/docs/developer-guide/syntax-reference.html#message-keys

    Otherwise, you can specify the primary key when creating a table from a topic: https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-table-with-selected-columns

    e.g.

    CREATE TABLE orders
      (orderid VARCHAR PRIMARY KEY,
       itemid VARCHAR)
    WITH (KAFKA_TOPIC = 'orders',
          VALUE_FORMAT='JSON');
    

    However, you would then have to query the table and filter where type=t1