Here is the description of the problem statement:
STREAM_SUMMARY: A stream with one of the value columns as an ARRAY-of-STRUCTS.
Name : STREAM_SUMMARY
Field | Type
------------------------------------------------------------------------------------------------------------------------------------------------
ROWKEY | STRUCT<asessment_id VARCHAR(STRING), institution_id INTEGER> (key)
assessment_id | VARCHAR(STRING)
institution_id | INTEGER
responses | ARRAY<STRUCT<student_id INTEGER, question_id INTEGER, response VARCHAR(STRING)>>
------------------------------------------------------------------------------------------------------------------------------------------------
STREAM_DETAIL: This is a stream to be created from STREAM1, by "exploding" the the array-of-structs into separate rows. Note that the KEY schema is also different.
Below is the Key and Value schema I want to achieve (end state)...
Name : STREAM_DETAIL
Field | Type
-------------------------------------------------------------------------------------------------------
ROWKEY | **STRUCT<asessment_id VARCHAR(STRING), student_id INTEGER, question_id INTEGER> (key)**
assessment_id | VARCHAR(STRING)
institution_id | INTEGER
student_id | INTEGER
question_id | INTEGER
response | VARCHAR(STRING)
My objective is to create the STREAM_DETAIL from the STREAM_SUMMARY.
I tried the below:
CREATE STREAM STREAM_DETAIL WITH (
KAFKA_TOPIC = 'stream_detail'
) AS
SELECT
STRUCT (
`assessment_id` := "assessment_id",
`student_id` := EXPLODE("responses")->"student_id",
`question_id` := EXPLODE("responses")->"question_id"
)
, "assessment_id"
, "institution_id"
, EXPLODE("responses")->"student_id"
, EXPLODE("responses")->"question_id"
, EXPLODE("responses")->"response"
FROM STREAM_SUMMARY
EMIT CHANGES;
While the SELECT query works fine, the CREATE STREAM returned with the following error: "Key missing from projection."
If I add the ROWKEY column in the SELECT clause in the above statement, things work, however, the KEY schema of the resultant STREAM is same as the original SREAM's key. The "Key" schema that I want in the new STREAM is : STRUCT<asessment_id VARCHAR(STRING), student_id INTEGER, question_id INTEGER> (key)
Alternatively, I tried creating the STREAM_DETAIL by hand (using plain CREATE STREAM statement by providing key and value SCHEMA_IDs). Later I tried the INSERT INTO approach...
INSERT INTO STREAM_DETAIL
SELECT ....
FROM STREAM_SUMMARY
EMIT CHANGES;
The errors were the same.
Can you please guide on how can I achieve enriching a STREAM but with a different Key Schema? Note that a new/different Key schema is important for me since I use the underlying topic to be synced to a database via a Kafka sink connector. The sink connector requires the key schema in this way, for me to be able to do an UPSERT.
I am not able to get past this. Appreciate your help.
You can't change the key of a stream when it is created from another stream.
But there is a different approach to the problem.
What you want is re-key. And to do so you need to use ksqlDB table. Can be solved like -
CREATE STREAM IF NOT EXISTS INTERMEDIATE_STREAM_SUMMARY_FLATTNED AS
SELECT
ROWKEY,
EXPLODE(responses) as response
FROM STREAM_SUMMARY;
CREATE TABLE IF NOT EXISTS STREAM_DETAIL AS -- This also creates a underlying topic
SELECT
ROWKEY -> `assessment_id` as `assessment_id`,
response -> `student_id` as `student_id`,
response -> `question_id` as `question_id`,
LATEST_BY_OFFSET(ROWKEY -> `institution_id`, false) as `institution_id`,
LATEST_BY_OFFSET(response -> `response`, false) as `response`
FROM INTERMEDIATE_STREAM_SUMMARY_FLATTNED
GROUP BY ROWKEY -> `assessment_id`, response -> `student_id`, response -> `question_id`;
Key schema will be STRUCT<asessment_id VARCHAR(STRING), student_id INTEGER, question_id INTEGER>
, you can check schema registry or print the topic to validate that. In ksqlDB describe table will show you flat key, but don't panic.
I have used similar and sync the final topic to database.