I am trying to reading and apply some subsetting on multiple files in GCP with Apache Beam. I prepared two pipelines which work for only one file, but fail when I try them on multiple files. Apart from this, I would be handy to combine my pipelines into one if possible or is there a way to orchestrate them so that they work in order. Now the pipelines work locally, but my ultimate goal is to run them with Dataflow.
I textio.ReadFromText and textio.ReadAllFromText, but I couldn't make neither work in case of multiple files.
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
These two pipelines work well for a single file, but I have hundred files in the same format and would like to use the advantages of parallel computing.
Is there a way to make this pipeline work for multiple files under the same directory?
Is it possible to do this within a single pipe instead of creating two different pipelines? (It is not handy to write files to worker nodes from bucket.)
I solved how to make it work for multiple files but couldn't make it run within a single pipeline though. I used for loop and then beam.Flatten option.
Here is my solution:
file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
with beam.Pipeline(options=PipelineOptions()) as p:
for i,file in enumerate(file_list):
(p
| "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
| "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
for i,res in enumerate(res_list):
pcol = (p | 'read_data_{}'.format(i) >> beam.Create([res])
| "toJson_{}".format(i) >> beam.Map(toJson)
| "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
pcols.append(pcol)
out = (pcols
| beam.Flatten()
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))