Search code examples
pythongoogle-cloud-dataflowapache-beam

Apache Beam - ReadFromText safely (pass over errors)


I have a simple Apache Beam pipeline which reads compressed bz2 files and writes them out to text files.

import apache_beam as beam

p1 = beam.Pipeline()

(p1
        | 'read' >> beam.io.ReadFromText('bad_file.bz2')
        | 'write' >> beam.io.WriteToText('file_out.txt')
 )

p1.run()

The problem is when the pipeline encounters a bad file (example). In this case, most of my bad files are malformed, not in bz2 format or simply empty, which confuses the decompressor, causing an OSError: Invalid data stream.

How can I tell ReadFromText to pass on these?


Solution

  • You may want to filter your files and then use apache_beam.io.textio.ReadAllFromText.

    For example

    import apache_beam as beam
    from apache_beam.io import fileio
    from apache_beam.io import filesystem
    from apache_beam.io import filesystems
    import bz2
    
    
    def is_valid_bz2(path):
        try:
            with filesystems.open(path, filesystem.CompressionTypes.BZ2) as handle:
                handle.peek()
                return True
        except Exception:
            return False
    
    
    with beam.Pipeline() as p:
    
        (p
         | 'match' >> fileio.MatchFiles("/path/to/*.bz2")
         | 'filter' >> beam.Filter(lambda m: is_valid_bz2(m.path))
         | 'read' >> beam.io.textio.ReadAllFromText()
         | 'write' >> beam.io.WriteToText('file_out.txt'))
    

    Here is_valid_bz2 yses the filesystems utilities to be able to read from all supported filesystems.