Search code examples
pyflink

How to use table.where() to filter for subfields in PyFlink?


I'm using pyflink and Flink 11.2 and I've defined my table like this:

def _create_sink_table(st_env):
    # Create SINK table.
    st_env.execute_sql(f"""
        CREATE TABLE {"in"} (
            `a` STRING,
            `b` STRING,
            `c` STRING,
            `d` ROW(
                `e` STRING,
                `f` STRING
            )
        ) WITH (
            'connector' = 'kafka',
            'topic' = '{SINK_TOPIC_NAME}',
            'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
            'format' = 'json',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true'
        )
    """)

I want to use table.where to filter for subfield:

def execute(st_env: StreamTableEnvironment):
    table = st_env.from_path("in")
    table = table.select("a, b, c, d")
    table = table.where("d.e = 'fail'")

Everything I've tried so far resulted in an error, this is one of them:

➜  ./run.sh
Traceback (most recent call last):
  File "/tmp/test.py", line 143, in <module>
    main()
  File "/tmp/test.py", line 139, in main
    execute(st_env)
  File "/tmp/test.py", line 109, in execute
    table = table.where("d.e = 'fail'")
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 140, in where
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.where.
: org.apache.flink.table.api.ValidationException: Undefined function: e
    at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:49)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
    at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:38)
    at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
    at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:65)
    at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:64)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)

It tries to execute the function e on d, instead of retrieving the subfield.


Solution

  • I think table.where("d.get(0) = 'fail'") maybe work