Search code examples
google-cloud-dataflowapache-beamdataflowapache-beam-io

Compare 2 text files when the key columns are in the middle using Apache beam - Python


The need to compare 2 text files and then load into BQ. Lets say I have emp text file(enum, ename, edept, esal) contains below data.the key is in the middle of the file(index=2)

100,abc,d10,7000
120,xyz,d20,5000

Another file is dept (dnum, dname) contains below data.

d10,IT
d20,engineering

As per beam behavior, I think the key should bring to first column to compare with other file. I am able to bring that to first column but I not able to merge. Here is my code.

class splitrow(beam.DoFn):
    def process(self, lines):
        return [lines.split(',')]

class formatinput(beam.DoFn):
    def process(self, emprow):
        (key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
        return [(key,value)]

class findkv(beam.DoFn):
    def process(self, deptrow):
        (key,value) = (deptrow)
        return [(key,value)]

# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

#Dataflow pipelines
input1_emp_collection = (
    p
    | "ReadEMPfile" >> beam.io.ReadFromText('gs://pybeam_bucket/emp.txt')
    | "SplitEMPFile" >> beam.ParDo(splitrow())
    | "FormatEMPFile" >> beam.ParDo(formatinput())
    | "Print1" >> beam.Map(print)                
)
input2_dept_collection = (
    p
    | "ReadDEPTfile" >> beam.io.ReadFromText('gs://pybeam_bucket/dept.txt')
    | "SplitDEPTFile" >> beam.ParDo(splitrow())
    | "FormatDEPTFile" >> beam.ParDo(findkv())
    | "Print2" >> beam.Map(print)                
)

result = (({
      'emp': input1_emp_collection, 'dept': input2_dept_collection
  })
  | "Merge" >> beam.CoGroupByKey()
  | beam.Map(print)
  )

# Run a pipeline
if __name__ == "__main__":
    p.run()

When I run the code I am getting below error:

TypeError: cannot unpack non-iterable NoneType object [while running 'Merge/pair_with_emp']

help me to compare t text files when one of the key is the middle of input file and load when matching records are identified into BQ. I am new to apache beam too. Thanks for your support.


Solution

  • The problem is on | "Print1" >> beam.Map(print) and | "Print2" >> beam.Map(print), when you add this lines to the end of the pipeline, you are sending a None to the CoGroupByKey() in the results. You have to remove it.

    import apache_beam as beam 
    from apache_beam.options.pipeline_options import PipelineOptions
    
    
    class splitrow(beam.DoFn):
        def process(self, lines):
            return [lines.split(',')]
    
    class formatinput(beam.DoFn):
        def process(self, emprow):
            (key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
            return [(key,value)]
    
    class findkv(beam.DoFn):
        def process(self, deptrow):
            (key,value) = (deptrow)
            return [(key,value)]
    
    
    # pipelineOptions
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    
    #Dataflow pipelines
    input1_emp_collection = (
        p
        | "ReadEMPfile" >> beam.io.ReadFromText('gs://<bucket>/emp.txt')
        | "SplitEMPFile" >> beam.ParDo(splitrow())
        | "FormatEMPFile" >> beam.ParDo(formatinput())       
    )
    input2_dept_collection = (
        p
        | "ReadDEPTfile" >> beam.io.ReadFromText('gs://<bucket>/dept.txt')
        | "SplitDEPTFile" >> beam.ParDo(splitrow())
        | "FormatDEPTFile" >> beam.ParDo(findkv())                
    )
    
    #Debug lines that shows the nones
    #result = input1_emp_collection | beam.Map(print)
    
    result = (({'emp': input1_emp_collection, 'dept': input2_dept_collection})
        | "Merge" >> beam.CoGroupByKey()
        | beam.Map(print)
    )
    
    
    # Run a pipeline
    if __name__ == "__main__":
        p.run()
    

    Result:

    python teste2.py 
    WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
    ('d10', {'emp': [('100', 'abc', '7000')], 'dept': ['IT']})
    ('d20', {'emp': [('120', 'xyz', '5000')], 'dept': ['engineering']})