Search code examples
apache-flinkpyflink

Py4JJavaError in pyflink Table api


This code converts pandas to flink table do the transformation than again converting back to pandas. It perfectly works fine when I use filter filter than select but gives me an error when i add group_by and order_by.

import pandas as pd
import numpy as np

f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
table_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)

df = pd.read_csv("dataBase/New/candidate.csv")

col = ['candidate_id', 'candidate_source_id', 'candidate_first_name',
       'candidate_middle_name', 'candidate_last_name', 'candidate_email',
       'created_date', 'last_modified_date', 'last_modified_by']

table = table_env.from_pandas(df,col)
table.filter("candidate_id > 322445")\
    .filter("candidate_first_name === 'Libby'")\
    .group_by("candidate_id, candidate_source_id")\
    .select("candidate_id, candidate_source_id")\
    .order_by("candidate_id").to_pandas()

My error is

Py4JJavaError: An error occurred while calling o3164.orderBy.
: org.apache.flink.table.api.ValidationException: A limit operation on unbounded tables is currently not supported.
    at org.apache.flink.table.operations.utils.SortOperationFactory.failIfStreaming(SortOperationFactory.java:131)
    at org.apache.flink.table.operations.utils.SortOperationFactory.createSort(SortOperationFactory.java:63)
    at org.apache.flink.table.operations.utils.OperationTreeBuilder.sort(OperationTreeBuilder.java:409)
    at org.apache.flink.table.api.internal.TableImpl.orderBy(TableImpl.java:401)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)

Solution

  • If you look in the documentation, you will see that with the Table API, ORDER BY is only supported for batch queries. If you switch to SQL, then you can have streaming queries that sort on an ascending time attribute.

    Sorting by anything else in an unbounded streaming query is simply impossible, since sorting requires full knowledge of the input.