Search code examples
google-cloud-dataflowapache-beamparquet

Beam/Dataflow read Parquet files and add file name/path to each record


I'm using Apache Beam Python SDK and I'm trying to read data from Parquet files using apache_beam.io.parquetio, but I also want to add the filename (or path) to the data since it contains data as well. I went over the suggested pattern here and read that Parquetio is similar to fileio but it doesn't seem like it implements the functionality that allows to go over files and add that to the party.

Anyone figured a good way to implement this?

Thanks!


Solution

  • If the number of files is not tremendous, you can get all the files before you read them through the IO.

    import glob
    
    filelist = glob.glob('/tmp/*.parquet')
    p = beam.Pipeline()
    
    class PairWithFile(beam.DoFn):
        def __init__(self, filename):
            self._filename = filename
    
        def process(self, e):
            yield (self._filename, e)
    
    file_with_records = [
        (p 
         | 'Read %s' % (file) >> beam.io.ReadFromParquet(file)
         | 'Pair %s' % (file) >> beam.ParDo(PairWithFile(file)))
        for file in filelist 
    ] | beam.Flatten()
    

    Then your PCollection looks like this:

    enter image description here