Trying to deploy a pipeline. I am following this implementation: https://github.com/GoogleCloudPlatform/professional-services/blob/main/examples/dataflow-python-examples/batch-examples/cookbook-examples/pipelines/data_enrichment.py
Though slightly changing it as the mapping data is in a csv file not in bq.
Error message:
File "dataflow_pipeline.py", line 193, in <module>
run()
File "dataflow_pipeline.py", line 188, in run
p.run().wait_until_finish()
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 579, in wait_until_finish
self._executor.await_completion()
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 439, in await_completion
self._executor.await_completion()
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 488, in await_completion
raise_(t, v, tb)
File "/usr/local/lib/python3.7/dist-packages/future/utils/__init__.py", line 441, in raise_
raise exc
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
finish_state)
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 420, in attempt_call
evaluator.process_element(value)
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 881, in process_element
self.runner.process(element)
File "apache_beam/runners/common.py", line 1210, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/dist-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1347, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 133, in process
self._signature.process_method.watermark_estimator_provider.
AttributeError: 'apache_beam.runners.common.MethodWrapper' object has no attribute 'watermark_estimator_provider' [while running 'Read From Text/Read/_SDFBoundedS
ourceWrapper/ParDo(SDFBoundedSourceDoFn)/pair']
Run function:
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
# Here we add some specific command line arguments we expect. Specifically
# we have the input file to load, mapping file and the output table to write to.
parser.add_argument(
'--input',
dest='input',
required=False,
help='Bas Table csv file to read.',
default='gs://test-files/Bas_dataset.csv')
parser.add_argument(
'--mapping',
dest='mapping',
required=False,
help='Opt Table csv file to read.',
default='gs://test-files/Opt_dataset.csv')
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output BQ table to write results to.',
default='transform_access.option_profits')
# Parse arguments from the command line.
known_args, pipeline_args = parser.parse_known_args(argv)
# DataIngestion is a class we built in this script to hold the logic for
# transforming the file into a BigQuery table.
data_ingestion = DataIngestion()
# Initiate the pipeline using the pipeline arguments passed in from the
# command line. This includes information like where Dataflow should store
# temp files, and what the project id is
p = beam.Pipeline(Opt=PipelineOpt(pipeline_args))
schema = parse_table_schema_from_json(data_ingestion.bq_schema_str)
def find_exact_cost(row, model_option_to_cost_map):
row['Production_Cost'] = model_option_to_cost_map[row['ModelOptionKey']]
return row
def find_avg_cost(row, model_option_to_cost_map):
row['Production_Cost'] = model_option_to_cost_map[row['Opt_Code']]
return row
material_cost_m = \
(p | 'Read From Text 1' >> beam.io.ReadFromText(known_args.mapping,
skip_header_lines=1)
| 'String to BigQuery Row 1' >>
beam.Map(lambda s: data_ingestion.parse_input_csv(s, 'option_csv_m')))
material_cost_o = \
(p | 'Read From Text 2' >> beam.io.ReadFromText(known_args.mapping,
skip_header_lines=1)
| 'String to BigQuery Row 2' >>
beam.Map(lambda s: data_ingestion.parse_input_csv(s, 'option_csv_o')))
(p
| 'Read From Text' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
# Translates from the raw string data in the CSV to a dictionary.
# The dictionary is a keyed by column names with the values being the values
# we want to store in BigQuery.
| 'String to BigQuery Row Bas' >>
beam.Map(lambda s: data_ingestion.parse_input_csv(s, 'Bas_csv'))
# Here we pass in a side input, which is data that comes from Opt
# CSV source. The side input contains a map of model&option to material cost.
| 'Join Data Exact' >> beam.Map(find_exact_cost, AsDict(material_cost_m))
| 'Join Data Avg' >> beam.Map(find_avg_cost, AsDict(material_cost_o))
# This is the final stage of the pipeline, where we define the destination
# of the data. In this case we are writing to BigQuery.
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
# The table name is a required argument for the BigQuery sink.
# In this case we use the value passed in from the command line.
known_args.output,
# Here we use the JSON schema read in from a JSON file.
# Specifying the schema allows the API to create the table correctly if it does not yet exist.
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
I was able to reproduce your issue when I followed data_enrichment.py. I was able to make it work when I used WriteToBigQuery, because BigQuerySink is already deprecated since version 2.11.0. WriteToBigQuery has more parameters compared to BigQuerySink. Replace BigQuerySink
to WriteToBigQuery
and just add custom_gcs_temp_location
in the parameters.
beam.io.WriteToBigQuery(
known_args.output,
custom_gcs_temp_location='gs://your_temp_gcs_location_here/'
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
classapache_beam.io.gcp.bigquery.WriteToBigQuery(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', kms_key=None, batch_size=None, max_file_size=None, max_files_per_bundle=None, test_client=None, custom_gcs_temp_location=None, method=None, insert_retry_strategy=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, triggering_frequency=None, validate=True, temp_file_format=None, ignore_insert_ids=False)
classapache_beam.io.gcp.bigquery.BigQuerySink(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_EMPTY', validate=False, coder=None, kms_key=None)
The main difference of the two is BigQuerySink only supports batch pipelines, while WriteToBigQuery works for both batch and steaming pipelines.