Search code examples
apache-kafkaksqldb

How to remamp json with ksql in Kafka


The INPUT_DATA topic in Kafka gets all data messages posted with the same JSON schema:

{
  "year": {
    "month": {
      "day": {
        "hour": string
        }
      }
    }
}

First I create Kafka Stream using a persistent ksql query. This stream will be reading the data messages posted to INPUT_TOPIC

CREATE OR REPLACE STREAM INPUT_STREAM (year STRUCT<month STRUCT<day STRUCT<hour VARCHAR>>>) WITH (KAFKA_TOPIC = 'INPUT_TOPIC', VALUE_FORMAT = 'JSON');

I confirm using Confluent Kafka web UI that the INPUT_STREAM was properly created with its schema defined to support the data-messages that follow schema

{year: {month: {day: hour: string}}}}

enter image description here

Next I need to create a second OUTPUT_STREAM. This stream will be created to read the data messages from INPUT_STREAM. It will remap the original input JSON messages by removing the top level year field. So instead of being 4 levels deep the resulted messages will be 3 levels deep, for example: 1.month > 2.day > 3.hour :

{
  "month": {
    "day": {
      "hour": string
      }
    }
}

OUTPUT_STREAM will be writing the re-mapped JSON messages to its own OUTPUT_TOPIC. The message schema for this stream should be 3 levels deep:

enter image description here

So the entire setup is made of two Topics and two Streams:

INPUT_TOPIC > INPUT_STREAM > OUTPUT_STREAM > OUTPUT_TOPIC

I go ahead and put together ksql statement posted below. It attempts to remap YEAR->MONTH->DAY->HOUR as STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>>:

CREATE STREAM OUTPUT_STREAM WITH (KAFKA_TOPIC='OUTPUT_TOPIC', REPLICAS=3) AS SELECT YEAR->MONTH->DAY->HOUR HOUR STRUCT<MONTH STRUCT<DAY STRUCT<HOUR VARCHAR>>> FROM INPUT_DATA_STREAM EMIT CHANGES;

But it fails with the error Line 1:8: no viable alternative at input

Is it possible to remap the values from input data to a different json schema using ksql and streams or tables?


Solution

  • SELECT YEAR->MONTH->DAY->HOUR is selecting only that inner-most hour string field.

    To select the month struct, you only need SELECT YEAR->MONTH, and that'll bring along the nested data within that struct.

    The schema should be inferred from the original stream, so you shouldn't need to redefine it.