I am trying to write an apache beam pipeline where the pipeline divides into three branches, where each branch writes into BigQuery, and then merges into one to write another Bigquery Table for logging.
I am unable to merge the branches, here is the code:
pipeline_options = PipelineOptions(None)
p = beam.Pipeline(options=pipeline_options)
ingest_data = (
p
| 'Start Pipeline' >> beam.Create([None])
)
p1 = (ingest_data | 'Read from and to date 1' >> beam.ParDo(OutputValueProviderFn('table1'))
| 'fetch API data 1' >> beam.ParDo(get_api_data())
| 'write into gbq 1' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table1',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://project/temp')
)
p2 = (ingest_data | 'Read from and to date 2' >> beam.ParDo(OutputValueProviderFn('table2'))
| 'fetch API data 2' >> beam.ParDo(get_api_data())
| 'write into gbq 2' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://proj/temp')
)
p3 = (ingest_data | 'Read from and to date 3' >> beam.ParDo(OutputValueProviderFn('table3'))
| 'fetch API data 3' >> beam.ParDo(get_api_data())
| 'write into gbq 3' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table3',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://proj/temp')
)
# Here I would like to merge the three branches into one: This doesn't work
merge = (p1, p2, p3) | 'Write Log' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table_logging',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://proj/temp')
)
This causes the following error:
AttributeError: Error trying to access nonexistent attribute `0` in write result. Please see __documentation__ for available attributes.
I don't care about the output of the three branches, I need to merge them just to be sure that the three previous writes are completed.
Beam inspector graph looks like this:
You can't merge from WriteToBigQuery, you need to redesign your pipeline:
ingest_data = (
p
| 'Start Pipeline' >> beam.Create([None])
)
p1 = (ingest_data | 'Read from and to date 1' >> beam.ParDo(OutputValueProviderFn('table1'))
| 'fetch API data 1' >> beam.ParDo(get_api_data())
)
p2 = (ingest_data | 'Read from and to date 2' >> beam.ParDo(OutputValueProviderFn('table2'))
| 'fetch API data 2' >> beam.ParDo(get_api_data())
)
p3 = (ingest_data | 'Read from and to date 3' >> beam.ParDo(OutputValueProviderFn('table3'))
| 'fetch API data 3' >> beam.ParDo(get_api_data())
)
p1_to_bq = (p1 | 'write into bq 1' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table1',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://project/temp')
)
p2_to_bq = (p2 | 'write into bq 2' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table2',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://project/temp')
)
p3_to_bq = (p3 | 'write into bq 3' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table3',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://project/temp')
)
merge = (p1, p2, p3) | 'Write Log' >> beam.io.gcp.bigquery.WriteToBigQuery(table='proj.dataset.table_logging',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://proj/temp')
)