I'm using the latest version of Kafka sql server 0.29.2, I guess. I'm trying to create a reading table that reads from a specific topic which receives lots of events, but I'm interested in specific events. The JSON event has a property named "evenType", so I want to continually filter the events and create a specific table to store the client data, like phone number, email etc., to update the client info.
I created a stream called orders_inputs only for testing purposes, and then I tried to create this table, but I got that error.
create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
line 1:120: mismatched input 'as' expecting ';'
Statement: create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
Caused by: line 1:120: mismatched input 'as' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException
If you are wanting to create a table that contains the results of a select query from a stream you can use CREATE TABLE AS SELECT
e.g.
CREATE TABLE orders AS
SELECT orderid, itemid FROM orders_inputs
WHERE type='t1';
You can specify the primary key when creating the stream order_inputs: https://docs.confluent.io/5.4.4/ksql/docs/developer-guide/syntax-reference.html#message-keys
Otherwise, you can specify the primary key when creating a table from a topic: https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-table-with-selected-columns
e.g.
CREATE TABLE orders
(orderid VARCHAR PRIMARY KEY,
itemid VARCHAR)
WITH (KAFKA_TOPIC = 'orders',
VALUE_FORMAT='JSON');
However, you would then have to query the table and filter where type=t1