Search code examples
pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beamfastavro

In Apache Beam/Dataflow's WriteToBigQuery transform, how do you enable the deadletter pattern with Method.FILE_LOADS and Avro temp_file_format


In this document, Apache Beam suggests the deadletter pattern when writing to BigQuery. This pattern allows you to fetch rows that failed to be written from the transform output with the 'FailedRows' tag.

However, when I try to use it:

WriteToBigQuery(
    table=self.bigquery_table_name,
    schema={"fields": self.bigquery_table_schema},
    method=WriteToBigQuery.Method.FILE_LOADS,
    temp_file_format=FileFormat.AVRO,
)

A schema mismatch in one of my elements causes the following exception:

Error message from worker: Traceback (most recent call last):
File 
    "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630, 
    in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
    in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
    in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
    in fastavro._write.write_union ValueError: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy During handling of the above exception, another exception occurred: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1198,
    in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 718,
    in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 841,
    in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1334,
    in apache_beam.runners.common._OutputProcessor.process_outputs File "/my_code/apache_beam/io/gcp/bigquery_file_loads.py", line 258,
    in process writer.write(row) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1635,
    in write ex, self._avro_writer.schema, row)).with_traceback(tb) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630,
    in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
    in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
    in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
    in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
    in fastavro._write.write_union ValueError: Error writing row to Avro: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy Schema: ...

From what I gather, the schema mismatch causes fastavro._write.Writer.write to fail and throw an exception. Instead, I would like WriteToBigQuery to apply the deadletter behavior and return my malformed rows as FailedRows tagged output. Is there a way to achieve this?

Thanks

EDIT: Adding more detailed example of what I'm trying to do:

from apache_beam import Create
from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery
from apache_beam.io.textio import WriteToText

...

valid_rows = [{"some_field_name": i} for i in range(1000000)]
invalid_rows = [{"wrong_field_name": i}]

pcoll = Create(valid_rows + invalid_rows)

# This fails because of the 1 invalid row
write_result = (
    pcoll 
    |  WriteToBigQuery(
        table=self.bigquery_table_name,
        schema={
            "fields": [
                {'name': 'some_field_name', 'type': 'INTEGER', 'mode': 'NULLABLE'},
            ]
        },
        method=WriteToBigQuery.Method.FILE_LOADS,
        temp_file_format=FileFormat.AVRO,
    )
)

# What I want is for WriteToBigQuery to partially succeed and output the failed rows.
# This is because I have pipelines that run for multiple hours and fail because of 
# a small amount of malformed rows
(
    write_result[BigQueryWriteFn.FAILED_ROWS] 
    | WriteToText('gs://my_failed_rows/')
)

Solution

  • You can use a dead letter queue in the pipeline instead of let BigQuery catch errors for you. Beam proposes a native way for error handling and dead letter queue with TupleTags but the code is little verbose.

    I created an open source library called Asgarde for Python sdk and Java sdk to apply error handling for less code, more concise and expressive code :

    https://github.com/tosun-si/pasgarde

    (also the Java version : https://github.com/tosun-si/asgarde)

    You can install it with pip :

    asgarde==0.16.0
    
    pip install asgarde==0.16.0
    
    
    from apache_beam import Create
    from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery
    from apache_beam.io.textio import WriteToText
    from asgarde.collection_composer import CollectionComposer
    
    def validate_row(self, row) -> Dict :
        field = row['your_field']
            
        if field is None or field == '':
            # You can raise your own custom exception
            raise ValueError('Bad field')
    ...
    
    valid_rows = [{"some_field_name": i} for i in range(1000000)]
    invalid_rows = [{"wrong_field_name": i}]
    
    pcoll = Create(valid_rows + invalid_rows)
    
    # Dead letter queue proposed by Asgarde, it's return output and Failure PCollection.
    output_pcoll, failure_pcoll = (CollectionComposer.of(pcoll)
      .map(self.validate_row))
    
    # Good sink
    (
        output_pcoll 
        |  WriteToBigQuery(
            table=self.bigquery_table_name,
            schema={
                "fields": [
                    {'name': 'some_field_name', 'type': 'INTEGER', 'mode': 'NULLABLE'},
                ]
            },
            method=WriteToBigQuery.Method.FILE_LOADS
        )
    )
    
    # Bad sink : PCollection[Failure] / Failure contains inputElement and 
    # stackTrace.
    (
        failure_pcoll 
        | beam.Map(lambda failure : self.your_failure_transformation(failure))
        |  WriteToBigQuery(
            table=self.bigquery_table_name,
            schema=your_schema_for_failure_table,
            method=WriteToBigQuery.Method.FILE_LOADS
        )
    )
    
    

    The structure of Failure object proposed by Asgarde lib :

    @dataclass
    class Failure:
        pipeline_step: str
        input_element: str
        exception: Exception
    

    In the validate_row function, you will apply your validation logic and detect bad fields. You will raise an exception in this case, and Asgarde will catch the error for you.

    The result of CollectionComposer flow is :

    • PCollection of output, in this case, I think is a PCollection[Dict]
    • PCollection[Failure]

    At the end you can process to multi sink :

    • Write good outputs to Bigquery
    • Write failures to Bigquery

    You can also apply the same logic with native Beam error handling and TupleTags, I proposed an exemple in a project from my Github repository :

    https://github.com/tosun-si/teams-league-python-dlq-native-beam-summit/blob/main/team_league/domain_ptransform/team_stats_transform.py