Search code examples
pythonpython-3.xapache-beamdataflowapache-beam-io

How to read multiple files in Apache Beam from GCP bucket


I am trying to reading and apply some subsetting on multiple files in GCP with Apache Beam. I prepared two pipelines which work for only one file, but fail when I try them on multiple files. Apart from this, I would be handy to combine my pipelines into one if possible or is there a way to orchestrate them so that they work in order. Now the pipelines work locally, but my ultimate goal is to run them with Dataflow.

I textio.ReadFromText and textio.ReadAllFromText, but I couldn't make neither work in case of multiple files.

def toJson(file):
    with open(file) as f:
        return json.load(f)


 with beam.Pipeline(options=PipelineOptions()) as p:
       files = (p
        | beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
        | beam.io.WriteToText("/home/test",
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

 with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p  
            | 'read_data' >> beam.Create(['test-00000-of-00001.json'])
            | "toJson" >> beam.Map(toJson)
            | "takeItems" >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
            | beam.combiners.Count.PerElement()
            | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

These two pipelines work well for a single file, but I have hundred files in the same format and would like to use the advantages of parallel computing.

Is there a way to make this pipeline work for multiple files under the same directory?

Is it possible to do this within a single pipe instead of creating two different pipelines? (It is not handy to write files to worker nodes from bucket.)


Solution

  • I solved how to make it work for multiple files but couldn't make it run within a single pipeline though. I used for loop and then beam.Flatten option.

    Here is my solution:

    file_list = ["gs://my_bucket/file*.txt.gz"]
    res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        for i,file in enumerate(file_list):
           (p 
            | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
            | "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
                       file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
    
    pcols = []
    with beam.Pipeline(options=PipelineOptions()) as p:
       for i,res in enumerate(res_list):
             pcol = (p   | 'read_data_{}'.format(i) >> beam.Create([res])
                | "toJson_{}".format(i) >> beam.Map(toJson)
                | "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
                | "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
            pcols.append(pcol)
       out = (pcols
        | beam.Flatten()
        | beam.combiners.Count.PerElement()
        | beam.io.WriteToText("/home/items",
                       file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))