Search code examples
javapythonapache-flinkapache-beampy4j

Apache Flink Python Table API UDF Dependencies Problem


After starting a Python Table API Job that involves user defined functions (UDF) by submitting it to a local cluster, it crashes with a py4j.protocol.Py4JJavaError caused by

java.util.ServiceConfigurationError: org.apache.beam.sdk.options.PipelineOptionsRegistrar: org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype.

I am aware that this is a bug concerning the dependencies on the lib path/classloading. I have already tried to follow all instructions at the following link: https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html

I have tried extensively different configurations with the classloader.parent-first-patterns-additional config option. Different entries with org.apache.beam.sdk.[...] have led to different, additional error messages.

The following dependencies, which refer to apache beam, are on the lib path:

  • beam-model-fn-execution-2.20.jar
  • beam-model-job-management-2.20.jar
  • beam-model-pipeline-2.20.jar
  • beam-runners-core-construction-java-2.20.jar
  • beam-runners-java-fn-execution-2.20.jar
  • beam-sdks-java-core-2.20.jar
  • beam-sdks-java-fn-execution-2.20.jar
  • beam-vendor-grpc-1_21_0-0.1.jar
  • beam-vendor-grpc-1_26_0.0.3.jar
  • beam-vendor-guava-26_0-jre-0.1.jar
  • beam-vendor-sdks-java-extensions-protobuf-2.20.jar

I can also rule out that it is due to my code, as I have tested the following sample code of the project website: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")

When executing this code, the same error message appears.

Does anyone have a description of a configuration of a Flink cluster that can run Python Table API jobs with UDF? Many thanks for all tips in advance!


Solution

  • The problem is solved by the new version 1.10.1 of Apache Flink. Executing the sample script shown in the question is now possible via the binaries with the command run -py path/to/script without any problems.

    As for the dependencies, they are already included in the already delivered flink_table_x.xx-1.10.1.jar. So no further dependencies need to be added to the lib-path, which was done in the question by the debugging/configuration attempt.