After starting a Python Table API Job that involves user defined functions (UDF) by submitting it to a local cluster, it crashes with a py4j.protocol.Py4JJavaError caused by
java.util.ServiceConfigurationError: org.apache.beam.sdk.options.PipelineOptionsRegistrar: org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype.
I am aware that this is a bug concerning the dependencies on the lib path/classloading. I have already tried to follow all instructions at the following link: https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html
I have tried extensively different configurations with the classloader.parent-first-patterns-additional
config option. Different entries with org.apache.beam.sdk.[...]
have led to different, additional error messages.
The following dependencies, which refer to apache beam, are on the lib path:
I can also rule out that it is due to my code, as I have tested the following sample code of the project website: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')
t_env.execute("tutorial_job")
When executing this code, the same error message appears.
Does anyone have a description of a configuration of a Flink cluster that can run Python Table API jobs with UDF? Many thanks for all tips in advance!
The problem is solved by the new version 1.10.1 of Apache Flink. Executing the sample script shown in the question is now possible via the binaries with the command run -py path/to/script
without any problems.
As for the dependencies, they are already included in the already delivered flink_table_x.xx-1.10.1.jar
. So no further dependencies need to be added to the lib-path, which was done in the question by the debugging/configuration attempt.