Search code examples
pythonapache-beam

Using args in pardo classname


Is it possible to give parameters in pardo(classname(args)). Because I tried but it says 3 args required in process function and 2 given. I tried init func too.. No hope. Please help. If possible let me know how to rewrite this code. NOTE: splitcols and filtercols works fine as no args are given.

import apache_beam as beam

class splitcols(beam.DoFn):
    def process(self,elements):
        return [elements.split(',')]

class filtercols(beam.DoFn):
    def process(self,elements):
        if elements[1]=='Drs.':
            return [elements]

class addvals(beam.DoFn):
    def process(self,elements,a): #here I tried to accept the arg
        return [(elements[a],1)]

p1 = beam.Pipeline()
attendance_count = (
    p1
    |beam.io.ReadFromText('100Records.csv')
    |beam.ParDo(splitcols())
    |beam.ParDo(filtercols())
    |beam.ParDo(addvals(2))  #here I tried to give args
    #|beam.CombinePerKey(sum)
    #|beam.Map(lambda employee: str(employee))
    |beam.io.WriteToText('data/pardooutput')
)
p1.run()

Solution

  • You can either pass args to __init__ like in the comment to the question, or pass them as side-inputs to the Pardo: beam.Pardo(addvals(), 2).

    See a similar example here: https://beam.apache.org/documentation/programming-guide/#side-inputs