I have a pipeline that gets data from BigQuery and writes it to GCS, however, if I find any rejects I want to right them to a Bigquery table. I am collecting rejects into a global list variable and later loading the list into BigQuery table. This process works fine in when I run it locally as the pipelines were running in the right order. When I run it using dataflowrunner, it doesn't guarantee the order ( I want pipeline1 to run before pipeline2. Is there a way to have dependent pipelines in Dataflow using python? or Also please suggest if this can be solved in with better approach. Thanks in advance.
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(lambda x: somefunction) # Collecting rejects in the except block of this method to a global list variable
....etc
| 'to gcs' >> beam.io.WriteToText(output)
)
# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
rejects = (pipeline2
| 'create pipeline' >> beam.Create(reject_list)
| 'to json format' >> beam.Map(lambda data: {.....})
| 'to bq' >> beam.io.WriteToBigQuery(....)
)
You can do something like that, but with only 1 pipeline, and some additional code in the transformation.
The beam.Map(lambda x: somefunction)
should have two outputs: the one that is written to GCS, and the rejected elements that will be eventually written to BigQuery.
For that, your transform function would have to return a TaggedOutput
.
There is an example in the Beam Programming Guide: https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn
The second PCollection
, you can then write to BigQuery.
You don't need to have a Create
in this second branch of the pipeline.
The pipeline would be something like this:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
Then the transform
function would be something like this
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)