I'm building a beam pipeline to process/transform csv files into xml files, following some rules. My approach so far have been to split the input csv file by rows at the beginning of the pipeline and feed every row into the pipeline. In the pipeline a transform every row into a xml tag, and at the end of the pipeline I combineGlobally everything into the final xml file. The problem now is that I need some extra information stored in Google Datastore to build the xml tag for every row in the csv file, and I don't know how to do this, because the query to retrieve data from datastore is a runtime parameter (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore) and the query depends on the PCollection. I need to build a query like this:
Select from xxx where id in PCollection
Is there a way to do this?, something like a combineGlobally to build the query, and then pass the query somehow to the ReadFromDatastore function? Or is there any way to do what I need?
I have something like this right now:
with beam.Pipeline(options=pipeline_options) as p:
items = (
p |
'ReadCsvFile' >> beam.io.Read(CsvFileSource(input_name))
'PrepareToJoin' >> beam.ParDo(PrepareToJoin())
)
datastore_items = (p |
'DatastoreDataP' >> ReadFromDatastore(project_id, query)
)
new_items = (
{'data': items, 'datastore': datastore_items} |
'JoinWithDatastore' >> beam.CoGroupByKey() |
'PostJoinProcess' >> beam.ParDo(PostJoinProcess())
)
xml_file = (new_items |
'ItemToXmlTag' >> beam.ParDo(ItemToXmlTag()) |
'MakeXMLFile' >> beam.CombineGlobally(XMLCombineFn()) |
WriteToText(output_name)
)
As you can see, query is a parameter in the pipeline.
Thanks in advance for your help!
You can use a ParDo to achieve this. Below a pseudo code of how to write a ParDo wherein you process each element of PCollection and then do a DataStore lookup
class EnrichEntities(beam.DoFn):
"""Updates Datastore entity"""
def process(self, element):
key = client.key('Task', 'sample_task')
task = client.get(element.key)
return [element]
else:
return [element]
Then in your pipeline you can use the ParDo as below
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create()
| 'DoFn methods' >> beam.ParDo(EnrichEntities())
| beam.Map(print)
To improve the performance of lookups you can also use BagState
wherein you can do a lookup for N records at a time. Here is a link to implement this https://beam.apache.org/blog/2017/08/28/timely-processing.html