Search code examples
pythongoogle-cloud-platformbigdatapipelineapache-beam

How to write each tagged output to different file in Apache beam


I have this code which tags outputs based on some data of the input file:

class filters(beam.DoFn):
 def process(self, element): 
    data = json.loads(element)
    yield TaggedOutput(data['EventName'],element)

I need help with the next step of writting the resulting tagged outputs:

tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')

So as you can see when I do '.with_outputs()' I dont know how many and what names are the taggs going to be so I can´t predict things like:

tag1 = tagged.tag1

Thank you for your help

UPDATE: this wont work cause with.outputs() is empty

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs()

for tag in tagged_data:
    print('something')

output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args

but this will work

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs('tag1','tag2')

for tag in tagged_data:
    print('something')

output:
  something
  something

Solution

  • Apache Beam pipeline execution is deferred--a DAG of operations to execute is built up and nothing actually happens until you run your pipeline. (In Beam Python, this is typically implicitly invoked at the end of a with beam.Pipeline(...) block.). PCollections don't actually contain data, just instructions for how the data should be computed.

    In particular, this means that when you write

    tagged = lines | beam.ParDo(filters()).with_outputs(...)
    

    tagged doesn't actually contain any data, rather it contains references to the PCollections that will be produced (and further processing steps can be added to them). The data in lines has not actually been computed or read yet so you can't (during pipeline construction) figure out what the set of outputs is.

    It's not clear what your end goal is from the question, but if you're trying to partition outputs, you may want to look into dynamic destinations.