Search code examples
pythongoogle-cloud-platformapache-beamdataflowgoogle-dataflow

Handling rejects in Dataflow/Apache Beam through dependent pipelines


I have a pipeline that gets data from BigQuery and writes it to GCS, however, if I find any rejects I want to right them to a Bigquery table. I am collecting rejects into a global list variable and later loading the list into BigQuery table. This process works fine in when I run it locally as the pipelines were running in the right order. When I run it using dataflowrunner, it doesn't guarantee the order ( I want pipeline1 to run before pipeline2. Is there a way to have dependent pipelines in Dataflow using python? or Also please suggest if this can be solved in with better approach. Thanks in advance.

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable
               ....etc
               | 'to gcs' >> beam.io.WriteToText(output)
               )

# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
    rejects = (pipeline2
                    | 'create pipeline' >> beam.Create(reject_list)
                    | 'to json format' >> beam.Map(lambda data: {.....})
                    | 'to bq' >> beam.io.WriteToBigQuery(....)
                    )

Solution

  • You can do something like that, but with only 1 pipeline, and some additional code in the transformation.

    The beam.Map(lambda x: somefunction) should have two outputs: the one that is written to GCS, and the rejected elements that will be eventually written to BigQuery.

    For that, your transform function would have to return a TaggedOutput.

    There is an example in the Beam Programming Guide: https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn

    The second PCollection, you can then write to BigQuery.

    You don't need to have a Create in this second branch of the pipeline.

    The pipeline would be something like this:

    with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
     
        data = (pipeline1
                   | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
                   | 'combine output to list' >> beam.combiners.ToList()
                   | 'tranform' >> beam.Map(transform)  # Tagged output produced here
    
        pcoll_to_gcs = data.gcs_output
        pcoll_to_bq  = data.rejected
    
        pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
        pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)
    

    Then the transform function would be something like this

    def transform(element):
      if something_is_wrong_with_element:
        yield pvalue.TaggedOutput('rejected', element)
    
      transformed_element = ....
    
      yield pvalue.TaggedOutput('gcs_output', transformed_element)