The need to compare 2 text files and then load into BQ. Lets say I have emp text file(enum, ename, edept, esal) contains below data.the key is in the middle of the file(index=2)
100,abc,d10,7000
120,xyz,d20,5000
Another file is dept (dnum, dname) contains below data.
d10,IT
d20,engineering
As per beam behavior, I think the key should bring to first column to compare with other file. I am able to bring that to first column but I not able to merge. Here is my code.
class splitrow(beam.DoFn):
def process(self, lines):
return [lines.split(',')]
class formatinput(beam.DoFn):
def process(self, emprow):
(key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
return [(key,value)]
class findkv(beam.DoFn):
def process(self, deptrow):
(key,value) = (deptrow)
return [(key,value)]
# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
#Dataflow pipelines
input1_emp_collection = (
p
| "ReadEMPfile" >> beam.io.ReadFromText('gs://pybeam_bucket/emp.txt')
| "SplitEMPFile" >> beam.ParDo(splitrow())
| "FormatEMPFile" >> beam.ParDo(formatinput())
| "Print1" >> beam.Map(print)
)
input2_dept_collection = (
p
| "ReadDEPTfile" >> beam.io.ReadFromText('gs://pybeam_bucket/dept.txt')
| "SplitDEPTFile" >> beam.ParDo(splitrow())
| "FormatDEPTFile" >> beam.ParDo(findkv())
| "Print2" >> beam.Map(print)
)
result = (({
'emp': input1_emp_collection, 'dept': input2_dept_collection
})
| "Merge" >> beam.CoGroupByKey()
| beam.Map(print)
)
# Run a pipeline
if __name__ == "__main__":
p.run()
When I run the code I am getting below error:
TypeError: cannot unpack non-iterable NoneType object [while running 'Merge/pair_with_emp']
help me to compare t text files when one of the key is the middle of input file and load when matching records are identified into BQ. I am new to apache beam too. Thanks for your support.
The problem is on | "Print1" >> beam.Map(print)
and | "Print2" >> beam.Map(print)
, when you add this lines to the end of the pipeline, you are sending a None
to the CoGroupByKey()
in the results. You have to remove it.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class splitrow(beam.DoFn):
def process(self, lines):
return [lines.split(',')]
class formatinput(beam.DoFn):
def process(self, emprow):
(key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
return [(key,value)]
class findkv(beam.DoFn):
def process(self, deptrow):
(key,value) = (deptrow)
return [(key,value)]
# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
#Dataflow pipelines
input1_emp_collection = (
p
| "ReadEMPfile" >> beam.io.ReadFromText('gs://<bucket>/emp.txt')
| "SplitEMPFile" >> beam.ParDo(splitrow())
| "FormatEMPFile" >> beam.ParDo(formatinput())
)
input2_dept_collection = (
p
| "ReadDEPTfile" >> beam.io.ReadFromText('gs://<bucket>/dept.txt')
| "SplitDEPTFile" >> beam.ParDo(splitrow())
| "FormatDEPTFile" >> beam.ParDo(findkv())
)
#Debug lines that shows the nones
#result = input1_emp_collection | beam.Map(print)
result = (({'emp': input1_emp_collection, 'dept': input2_dept_collection})
| "Merge" >> beam.CoGroupByKey()
| beam.Map(print)
)
# Run a pipeline
if __name__ == "__main__":
p.run()
Result:
python teste2.py
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
('d10', {'emp': [('100', 'abc', '7000')], 'dept': ['IT']})
('d20', {'emp': [('120', 'xyz', '5000')], 'dept': ['engineering']})