The INPUT_DATA
topic in Kafka gets all data messages posted with the same JSON schema:
{
"year": {
"month": {
"day": {
"hour": string
}
}
}
}
First I create Kafka Stream using a persistent ksql query. This stream will be reading the data messages posted to INPUT_TOPIC
CREATE OR REPLACE STREAM INPUT_STREAM (year STRUCT<month STRUCT<day STRUCT<hour VARCHAR>>>) WITH (KAFKA_TOPIC = 'INPUT_TOPIC', VALUE_FORMAT = 'JSON');
I confirm using Confluent Kafka web UI that the INPUT_STREAM
was properly created with its schema defined to support the data-messages that follow schema
{year: {month: {day: hour: string}}}}
Next I need to create a second OUTPUT_STREAM
. This stream will be created to read the data messages from INPUT_STREAM
. It will remap the original input JSON messages by removing the top level year
field. So instead of being 4 levels deep the resulted messages will be 3 levels deep, for example: 1.month > 2.day > 3.hour :
{
"month": {
"day": {
"hour": string
}
}
}
OUTPUT_STREAM
will be writing the re-mapped JSON messages to its own OUTPUT_TOPIC
. The message schema for this stream should be 3 levels deep:
So the entire setup is made of two Topics and two Streams:
INPUT_TOPIC
> INPUT_STREAM
> OUTPUT_STREAM
> OUTPUT_TOPIC
I go ahead and put together ksql
statement posted below. It attempts to remap YEAR->MONTH->DAY->HOUR
as STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>>
:
CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR HOUR STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>> FROM INPUT_DATA_STREAM EMIT CHANGES;
But it fails with the error Line 1:8: no viable alternative at input
Is it possible to remap the values from input data to a different json schema using ksql and streams or tables?
SELECT YEAR->MONTH->DAY->HOUR
is selecting only that inner-most hour
string field.
To select the month struct, you only need SELECT YEAR->MONTH
, and that'll bring along the nested data within that struct.
The schema should be inferred from the original stream, so you shouldn't need to redefine it.