Search code examples
apache-kafkaksqldb

Is it possible to concat two columns in Ksqldb in CREATE STREAM statement?


I want to concat two columns and use it as a timestamp field, and I have to write it in a CREATE STREAM statement because I want to use that timestamp for calculating WINDOW TUMBLING.

The SQL code:

create STREAM testStream (Price INT,concat(cast(RlcDate as varchar),LPAD(CAST(RlcTime AS VARCHAR),8,'0')) as timestamp, Id VARCHAR) WITH (kafka_topic='testTopic', partitions=1,value_format='JSON', timestamp = 'timestamp', timestamp_format = 'yyyyMMddHHmmssSS');

when I run this code, it returns the following error:

Caused by: line 1:42: mismatched input '(' expecting {'STRING', 'EMIT',
        'CHANGES', 'FINAL', 'ESCAPE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP',
        'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE',
        'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'TYPES', 'SHOW', 'TABLES',
        'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP',
        'SET', 'RESET', 'SESSION', 'DECIMAL', 'KEY', 'SINK', 'SOURCE', 'PRIMARY',
        'REPLACE', 'ASSERT', 'ADD', 'ALTER', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
        QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER, VARIABLE}
Caused by: org.antlr.v4.runtime.InputMismatchException

I want to know, is it correct to write CONCAT and CAST when creating a stream, or we just can use these kinds of queries in the select statement?

If it's not possible, is there any solution for solving this issue?


Solution

  • CREATE STREAM statements define how a stream looks in terms of fields, data types, and constraints. It is not the right place to define calculation logic, i.e. how the fields should be filled. The right place to do this is in a SELECT statement, as you correctly assumed.

    Try to first define your stream based on what is in your Kafka topic:

    CREATE STREAM testStream (Price INT, RlcDate VARCHAR, RlcTime VARCHAR, Id VARCHAR) 
    WITH (kafka_topic='testTopic', partitions=1,value_format='JSON');
    

    This stream is an alternative, queriable representation of your existing Kafka topic. Now you can select data according to your criteria and continuously expose the results of that query into a new stream (which in turn needs a backing Kafka topic):

    CREATE STREAM testStreamResult WITH (kafka_topic='testTopicResult', partitions=1, value_format='JSON', timestamp = 'concat_timestamp', timestamp_format = 'yyyyMMddHHmmssSS')
    AS SELECT 
    `Price`,
    `RlcDate`,
    `RlcTime`,
    concat(`RlcDate`, LPAD(RlcTime, 8, '0')) `concat_timestamp`,
    `Id`
    FROM testStream
    EMIT CHANGES;
    

    If the provided logic is correct based on incoming data, testStreamResult should allow you to use it in a JOIN with the timestamp semantics you are looking for.