Search code examples
apache-kafkaapache-flinkflink-streamingflink-sqlpyflink

Flink DDL can not parse 2 different json roots from Kafka Topic


I am sending message from Kafka to Flink in Python. I have 2 different json roots in one Kafka topic. My json roots with examples:

1- {'Message1': {'b': 'c'}}

2- {'Message2': {'e': 'f'}}

Flink can consume these messages but can not parse for DDL format.

        CREATE TABLE audienceInput (
      `messageKey` VARBINARY,
        `message` VARBINARY,
        `topic` VARCHAR
        ) WITH (
        'connector' = 'kafka',
        'topic' = 'mytopic',
        'properties.bootstrap.servers' = '****:9092',
        'scan.startup.mode' = 'earliest-offset',
        'value.format' = 'json'
        )
        """ 

How can I parse two root types of messages in Flink DDL?


Solution

  • You can directly declare the two Message1 and Message2 fields in the table schema.

    CREATE TABLE audience_input (
      Message1 ROW(b STRING, c STRING),
      Message2 ROW(e STRING, f STRING),
      ...
    ) WITH
      ...
      'value.format' = 'json'
    )
    

    They will then appear as columns with nested fields in the resulting column.

    See the Json format doc for more details about the json-format.