Is it possible to give parameters in pardo(classname(args)). Because I tried but it says 3 args required in process function and 2 given. I tried init func too.. No hope. Please help. If possible let me know how to rewrite this code. NOTE: splitcols and filtercols works fine as no args are given.
import apache_beam as beam
class splitcols(beam.DoFn):
def process(self,elements):
return [elements.split(',')]
class filtercols(beam.DoFn):
def process(self,elements):
if elements[1]=='Drs.':
return [elements]
class addvals(beam.DoFn):
def process(self,elements,a): #here I tried to accept the arg
return [(elements[a],1)]
p1 = beam.Pipeline()
attendance_count = (
p1
|beam.io.ReadFromText('100Records.csv')
|beam.ParDo(splitcols())
|beam.ParDo(filtercols())
|beam.ParDo(addvals(2)) #here I tried to give args
#|beam.CombinePerKey(sum)
#|beam.Map(lambda employee: str(employee))
|beam.io.WriteToText('data/pardooutput')
)
p1.run()
You can either pass args to __init__
like in the comment to the question, or pass them as side-inputs to the Pardo: beam.Pardo(addvals(), 2)
.
See a similar example here: https://beam.apache.org/documentation/programming-guide/#side-inputs