Search code examples
pythonhadoopmapreduce

Calculating Average with Combiner in Mapreduce


I have a .csv source file in the form of:

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,30.95,1,MATT,MORAL,CUREPIPE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1, MATT,MORAL, CUREPIPE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,89.95,1,LELA,SMI,HASSEE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,54.50,1,LELA,SMI,HASSEE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,2,TOM, SON,FLACQ

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD,PLOUIS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,1,DYDY,ARD, PLOUIS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD, PLOUIS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,2,TAY,ANA,VACOAS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,35.00,3,TAY,ANA,VACOAS

I would like to calculate the average cost (price*qty/total qty) for each person using a combiner in MapReduce with the following result:

MATT MORAL 25.45

LELA SMI 72.225

TOM SON 19.95

DYDY ARD 20.36

TAY ANA 29.8

So I came up with the following code which is not working (giving me double the average). I do feel like I need to add an IF ELSE statement in the reducer to process the output of the combiner (unique keys) differently to the output of the mapper (duplicated keys):

from mrjob.job import MRJob
class Job(MRJob):
    def mapper(self, key, value):
        words = value.strip().split(',')
        full_name = words[-3] + ' ' + words[-2]
        price, qty = float(words[-5]), int(words[-4])
        yield full_name, (price, qty)

    def combiner(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0] * value[1])
            totalqty += value[1]
        yield key, (totalprice, totalqty)

    def reducer(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0] * value[1])
            totalqty += value[1]
        average = round(totalprice/totalqty,2)
        yield key, average
        
if __name__ == '__main__':
    Job.run()

Grateful if you could give me some guidance with the reducer!


Solution

  • You shouldn't be weighting the totalprice in the reducer as you have already done that in the combiner -

    def reducer(self, key, values):
            totalprice, totalqty = 0,0
            for value in values:
                totalprice += (value[0])
                totalqty += value[1]
            average = round(totalprice/totalqty,2)
            yield key, average
    

    Some more explanation

    Here is what the Hadoop docs say about using a "Combiner" -

    Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

    Introducing a "Combiner" into the mix works if your reduce operation can be broken down to multiple "mini reduces" without changing the end result.

    If you want your combiner and reducer function to be the same - then you can need to make changes in the mapper function -

    Something like this -

    from mrjob.job import MRJob
    class Job(MRJob):
        def mapper(self, key, value):
            words = value.strip().split(',')
            full_name = words[-3] + ' ' + words[-2]
            price, qty = float(words[-5]), int(words[-4])
            price = price * qty # This is the change
            yield full_name, (price, qty)
    
        def combiner(self, key, values):
            totalprice, totalqty = 0,0
            for value in values:
                totalprice += (value[0]) # And change here
                totalqty += value[1]
            yield key, (totalprice, totalqty)
    
        def reducer(self, key, values):
            totalprice, totalqty = 0,0
            for value in values:
                totalprice += (value[0]) # Change here
                totalqty += value[1]
            average = round(totalprice/totalqty,2)
            yield key, average
        
    if __name__ == '__main__':
        Job.run()