Search code examples
apache-flinkflink-sqlpyflink

Querying nested row with Python in Flink


Based upon the pyflink walkthrough, I'm trying to now get a simple nested row query working using apache-flink==1.14.4. I've created my table structure based upon this solution: Get nested fields from Kafka message using Apache Flink SQL

A message looks like this:

{"signature": {"token": "abcd1234"}}

The relevant part of the code looks like this:

create_kafka_source_ddl = """
    CREATE TABLE nested_msg (
        `signature` ROW (
            `token` STRING
        )
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'nested_msg',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'nested-msg',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
    )
"""

create_es_sink_ddl = """
    CREATE TABLE es_sink (
        token STRING
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'http://elasticsearch:9200',
        'index' = 'nested_count_1',
        'document-id.key-delimiter' = '$',
        'sink.bulk-flush.max-size' = '42mb',
        'sink.bulk-flush.max-actions' = '32',
        'sink.bulk-flush.interval' = '1000',
        'sink.bulk-flush.backoff.delay' = '1000',
        'format' = 'json'
    )
"""

t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
# How do I select the nested field here?
t_env.from_path("nested_msg").select(col("signature.token").alias("token")).select(
    "token"
).execute_insert("es_sink")

I've tried numerous variations here without success. The exception is:

py4j.protocol.Py4JJavaError: An error occurred while calling o48.select.
: org.apache.flink.table.api.ValidationException: Cannot resolve field [signature.token], input field list:[signature].

How can I selected a nested field like this in order to insert it into my sink?


Solution

  • You can change col("signature.token") to col("signature").get('token').