Search code examples
pythongoogle-cloud-platformapache-beamdataflow

DataFlow deployment fails with "object has no attribute watermark_estimator_provider" error


Trying to deploy a pipeline. I am following this implementation: https://github.com/GoogleCloudPlatform/professional-services/blob/main/examples/dataflow-python-examples/batch-examples/cookbook-examples/pipelines/data_enrichment.py

Though slightly changing it as the mapping data is in a csv file not in bq.

Error message:

File "dataflow_pipeline.py", line 193, in <module>
    run()
  File "dataflow_pipeline.py", line 188, in run
    p.run().wait_until_finish()
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 579, in wait_until_finish
    self._executor.await_completion()
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 439, in await_completion
    self._executor.await_completion()
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 488, in await_completion
    raise_(t, v, tb)
  File "/usr/local/lib/python3.7/dist-packages/future/utils/__init__.py", line 441, in raise_
    raise exc
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/executor.py", line 420, in attempt_call
    evaluator.process_element(value)
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 881, in process_element
    self.runner.process(element)
  File "apache_beam/runners/common.py", line 1210, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.7/dist-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1347, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/home/demirfaruk/.local/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 133, in process
    self._signature.process_method.watermark_estimator_provider.
AttributeError: 'apache_beam.runners.common.MethodWrapper' object has no attribute 'watermark_estimator_provider' [while running 'Read From Text/Read/_SDFBoundedS
ourceWrapper/ParDo(SDFBoundedSourceDoFn)/pair']

Run function:

def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
# Here we add some specific command line arguments we expect.   Specifically
# we have the input file to load, mapping file and the output table to write to.
parser.add_argument(
    '--input',
    dest='input',
    required=False,
    help='Bas Table csv file to read.',
    default='gs://test-files/Bas_dataset.csv')

parser.add_argument(
    '--mapping',
    dest='mapping',
    required=False,
    help='Opt Table csv file to read.',
    default='gs://test-files/Opt_dataset.csv')

parser.add_argument(
    '--output',
    dest='output',
    required=False,
    help='Output BQ table to write results to.',
    default='transform_access.option_profits')

# Parse arguments from the command line.
known_args, pipeline_args = parser.parse_known_args(argv)

# DataIngestion is a class we built in this script to hold the logic for
# transforming the file into a BigQuery table.
data_ingestion = DataIngestion()

# Initiate the pipeline using the pipeline arguments passed in from the
# command line.  This includes information like where Dataflow should store
#  temp files, and what the project id is
p = beam.Pipeline(Opt=PipelineOpt(pipeline_args))
schema = parse_table_schema_from_json(data_ingestion.bq_schema_str)

def find_exact_cost(row, model_option_to_cost_map):
    row['Production_Cost'] = model_option_to_cost_map[row['ModelOptionKey']]
    return row

def find_avg_cost(row, model_option_to_cost_map):
    row['Production_Cost'] = model_option_to_cost_map[row['Opt_Code']]
    return row

material_cost_m = \
    (p | 'Read From Text 1' >> beam.io.ReadFromText(known_args.mapping,
                                                    skip_header_lines=1)
     | 'String to BigQuery Row 1' >>
     beam.Map(lambda s: data_ingestion.parse_input_csv(s, 'option_csv_m')))

material_cost_o = \
    (p | 'Read From Text 2' >> beam.io.ReadFromText(known_args.mapping,
                                                  skip_header_lines=1)
     | 'String to BigQuery Row 2' >>
     beam.Map(lambda s: data_ingestion.parse_input_csv(s, 'option_csv_o')))

(p
 | 'Read From Text' >> beam.io.ReadFromText(known_args.input,
                                            skip_header_lines=1)
 # Translates from the raw string data in the CSV to a dictionary.
 # The dictionary is a keyed by column names with the values being the values
 # we want to store in BigQuery.
 | 'String to BigQuery Row Bas' >>
 beam.Map(lambda s: data_ingestion.parse_input_csv(s, 'Bas_csv'))
 # Here we pass in a side input, which is data that comes from Opt
 # CSV source.  The side input contains a map of model&option to material cost.
 | 'Join Data Exact' >> beam.Map(find_exact_cost, AsDict(material_cost_m))
 | 'Join Data Avg' >> beam.Map(find_avg_cost, AsDict(material_cost_o))

 # This is the final stage of the pipeline, where we define the destination
 #  of the data.  In this case we are writing to BigQuery.
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                # The table name is a required argument for the BigQuery sink.
                # In this case we use the value passed in from the command line.
                known_args.output,
                # Here we use the JSON schema read in from a JSON file.
                # Specifying the schema allows the API to create the table correctly if it does not yet exist.
                schema=schema,
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # Deletes all data in the BigQuery table before writing.
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

Solution

  • I was able to reproduce your issue when I followed data_enrichment.py. I was able to make it work when I used WriteToBigQuery, because BigQuerySink is already deprecated since version 2.11.0. WriteToBigQuery has more parameters compared to BigQuerySink. Replace BigQuerySink to WriteToBigQuery and just add custom_gcs_temp_location in the parameters.

        beam.io.WriteToBigQuery(
            known_args.output,
            custom_gcs_temp_location='gs://your_temp_gcs_location_here/'
            schema=schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    

    WriteToBigQuery

    classapache_beam.io.gcp.bigquery.WriteToBigQuery(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', kms_key=None, batch_size=None, max_file_size=None, max_files_per_bundle=None, test_client=None, custom_gcs_temp_location=None, method=None, insert_retry_strategy=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, triggering_frequency=None, validate=True, temp_file_format=None, ignore_insert_ids=False)

    BigQuerySink

    classapache_beam.io.gcp.bigquery.BigQuerySink(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_EMPTY', validate=False, coder=None, kms_key=None)

    The main difference of the two is BigQuerySink only supports batch pipelines, while WriteToBigQuery works for both batch and steaming pipelines.