Search code examples
python-3.xgoogle-cloud-storageapache-beamdataflow

Convert a list into a PCollection


I currently have a DoFn that looks at a bucket and looks at all the files within that bucket and dir prefix. This DoFn returns a list instead of a PCollection. How would I convert this list into a PCollection that can be consumed by the DoFn ConvertFileNames?

  # List all the files within a subdir 
  class ListBlobs(beam.DoFn):
    def start_bundle(self):
      self.storage_client = storage.Client()

    def process(self, prefix):
      bucket = self.storage_client.bucket('xxx')
      return list(self.bucket.list_blobs(prefix=prefix))

  # Convert Blobs into filenames as patterns
  class ConvertFileNames(beam.DoFn):
    def process(self, blob):
      return 'gs://' + blob.bucket.name + blob.name

Solution

  • As mentioned in the beam documentation, a Beam DoFn's process method returns an iterable of elements to place into the downstream PCollection. So, in your example, if I had a PCollection of prefixes, call it prefix_pcoll, then I could write

    blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())
    

    and blobs_pcoll will contain the list of blobs with this prefix (namely, the concatination of list(self.bucket.list_blobs(prefix=prefix)) over all prefixes). You could then write

    converted = blobs_pcoll | beam.ParDo(ConvertFileNames())
    

    you could also write

    converted = blobs_pcoll | beam.Map(
        lambda blob: 'gs://' + blob.bucket.name + blob.name)
    

    You may also want to look into apache_beam.io.fileio.MatchAll.