I have an Apache Beam pipeline which tries to write to Postgres after reading from BigQuery. The code uses JdbcIO connector and Dataflow runner. I am using Python 3.8.7 and Apache Beam 2.28.0
I was using default expansion service. I also tried to run a custom expansion service but still got the same error. Any idea ?
The code is as follow
def export_to_postgres(user_options, pipeline_options, password):
"""Creates a pipeline that writes entities to postgres."""
TeacherRow = NamedTuple(
"TeacherRow",
[
("teacher_id", str),
("first_name", str),
("last_name", str),
("total_all_publisher", int)
])
coders.registry.register_coder(TeacherRow, coders.RowCoder)
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
query=user_options.query_src,
use_standard_sql=True
)
| beam.Map(lambda x:
TeacherRow(teacher_id=str(x.teacher_id),
first_name=str(x.first_name),
last_name=str(x.last_name),
total_all_publisher=int(x.total_all_publisher)))
.with_output_types(TeacherRow)
| beam.WindowInto(beam.window.FixedWindows(10))
.with_output_types(TeacherRow)
| 'Write to jdbc' >> WriteToJdbc(
table_name="teacher",
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "your ip address", "5432", "postgres"),
username="postgres",
password="password"
)
)
p.run()
I am getting the following error
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 102, in <module>
run()
File "/Users/Trex/workspace/workflow/dataflow/bq-to-pg.py", line 97, in run
export_to_postgres(user_options, pipeline_options, password)
File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 58, in export_to_postgres
p.run()
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
return Pipeline.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
transform = ptransform.PTransform.from_runner_api(proto, context)
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
return constructor(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
DoFnInfo.from_runner_api(
File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1```
This is https://issues.apache.org/jira/browse/BEAM-12043, hopefully a fix can get into the next release.