Search code examples
ksqldb

KSQLDB: Using CREATE STREAM AS SELECT with Differing KEY SCHEMAS


Here is the description of the problem statement:

STREAM_SUMMARY: A stream with one of the value columns as an ARRAY-of-STRUCTS.

Name :  STREAM_SUMMARY
 Field           | Type
------------------------------------------------------------------------------------------------------------------------------------------------
 ROWKEY    | STRUCT<asessment_id VARCHAR(STRING), institution_id INTEGER> (key)
 assessment_id   | VARCHAR(STRING)
 institution_id  | INTEGER
 responses       | ARRAY<STRUCT<student_id INTEGER, question_id INTEGER, response VARCHAR(STRING)>>
------------------------------------------------------------------------------------------------------------------------------------------------

STREAM_DETAIL: This is a stream to be created from STREAM1, by "exploding" the the array-of-structs into separate rows. Note that the KEY schema is also different.

Below is the Key and Value schema I want to achieve (end state)...

Name                 : STREAM_DETAIL
 Field           | Type
-------------------------------------------------------------------------------------------------------
 ROWKEY          | **STRUCT<asessment_id VARCHAR(STRING), student_id INTEGER, question_id INTEGER> (key)**
 assessment_id   | VARCHAR(STRING)
 institution_id  | INTEGER
 student_id      | INTEGER
 question_id     | INTEGER
 response        | VARCHAR(STRING)

My objective is to create the STREAM_DETAIL from the STREAM_SUMMARY.

I tried the below:

CREATE STREAM STREAM_DETAIL WITH (
    KAFKA_TOPIC = 'stream_detail'
) AS 
SELECT 
    STRUCT (
        `assessment_id` := "assessment_id",
        `student_id` := EXPLODE("responses")->"student_id",
        `question_id` := EXPLODE("responses")->"question_id"
      )
, "assessment_id"
, "institution_id"
, EXPLODE("responses")->"student_id"
, EXPLODE("responses")->"question_id"
, EXPLODE("responses")->"response"
FROM STREAM_SUMMARY
EMIT CHANGES;

While the SELECT query works fine, the CREATE STREAM returned with the following error: "Key missing from projection."

If I add the ROWKEY column in the SELECT clause in the above statement, things work, however, the KEY schema of the resultant STREAM is same as the original SREAM's key. The "Key" schema that I want in the new STREAM is : STRUCT<asessment_id VARCHAR(STRING), student_id INTEGER, question_id INTEGER> (key)

Alternatively, I tried creating the STREAM_DETAIL by hand (using plain CREATE STREAM statement by providing key and value SCHEMA_IDs). Later I tried the INSERT INTO approach...

INSERT INTO STREAM_DETAIL 
SELECT ....
FROM STREAM_SUMMARY
EMIT CHANGES;

The errors were the same.

Can you please guide on how can I achieve enriching a STREAM but with a different Key Schema? Note that a new/different Key schema is important for me since I use the underlying topic to be synced to a database via a Kafka sink connector. The sink connector requires the key schema in this way, for me to be able to do an UPSERT.

I am not able to get past this. Appreciate your help.


Solution

  • You can't change the key of a stream when it is created from another stream.
    But there is a different approach to the problem.

    What you want is re-key. And to do so you need to use ksqlDB table. Can be solved like -

    CREATE STREAM IF NOT EXISTS INTERMEDIATE_STREAM_SUMMARY_FLATTNED AS
    SELECT
        ROWKEY,
        EXPLODE(responses) as response
    FROM STREAM_SUMMARY;
    
    CREATE TABLE IF NOT EXISTS STREAM_DETAIL AS -- This also creates a underlying topic
    SELECT
        ROWKEY -> `assessment_id` as `assessment_id`,
        response -> `student_id` as `student_id`,
        response -> `question_id` as `question_id`,
        LATEST_BY_OFFSET(ROWKEY -> `institution_id`, false) as `institution_id`,
        LATEST_BY_OFFSET(response -> `response`, false) as `response`
    FROM INTERMEDIATE_STREAM_SUMMARY_FLATTNED
    GROUP BY ROWKEY -> `assessment_id`, response -> `student_id`, response -> `question_id`;
    

    Key schema will be STRUCT<asessment_id VARCHAR(STRING), student_id INTEGER, question_id INTEGER>, you can check schema registry or print the topic to validate that. In ksqlDB describe table will show you flat key, but don't panic.

    I have used similar and sync the final topic to database.