Search code examples
pythongoogle-cloud-platformgoogle-cloud-datastoreapache-beam

How to use a PCollection to build datastore query in a beam pipeline?


I'm building a beam pipeline to process/transform csv files into xml files, following some rules. My approach so far have been to split the input csv file by rows at the beginning of the pipeline and feed every row into the pipeline. In the pipeline a transform every row into a xml tag, and at the end of the pipeline I combineGlobally everything into the final xml file. The problem now is that I need some extra information stored in Google Datastore to build the xml tag for every row in the csv file, and I don't know how to do this, because the query to retrieve data from datastore is a runtime parameter (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html#apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore) and the query depends on the PCollection. I need to build a query like this:

Select from xxx where id in PCollection

Is there a way to do this?, something like a combineGlobally to build the query, and then pass the query somehow to the ReadFromDatastore function? Or is there any way to do what I need?

I have something like this right now:

with beam.Pipeline(options=pipeline_options) as p:
    items = (
        p |
        'ReadCsvFile' >> beam.io.Read(CsvFileSource(input_name))
        'PrepareToJoin' >> beam.ParDo(PrepareToJoin())
    )

    datastore_items = (p |
        'DatastoreDataP' >> ReadFromDatastore(project_id, query)
    )

    new_items = (
        {'data': items, 'datastore': datastore_items} |
        'JoinWithDatastore' >> beam.CoGroupByKey() |
        'PostJoinProcess' >> beam.ParDo(PostJoinProcess())
    )

    xml_file = (new_items |
        'ItemToXmlTag' >> beam.ParDo(ItemToXmlTag()) |
        'MakeXMLFile' >> beam.CombineGlobally(XMLCombineFn()) |
        WriteToText(output_name)
    )

As you can see, query is a parameter in the pipeline.

Thanks in advance for your help!


Solution

  • You can use a ParDo to achieve this. Below a pseudo code of how to write a ParDo wherein you process each element of PCollection and then do a DataStore lookup

    
    class EnrichEntities(beam.DoFn):
    """Updates Datastore entity"""
    def process(self, element):
        key = client.key('Task', 'sample_task')
        task = client.get(element.key)
    
            return [element]
        else:
            return [element]
    

    Then in your pipeline you can use the ParDo as below

    with beam.Pipeline() as pipeline:
      results = (
          pipeline
          | 'Create inputs' >> beam.Create()
          | 'DoFn methods' >> beam.ParDo(EnrichEntities())
          | beam.Map(print)
    

    To improve the performance of lookups you can also use BagState wherein you can do a lookup for N records at a time. Here is a link to implement this https://beam.apache.org/blog/2017/08/28/timely-processing.html