Search code examples
pythonapache-sparkpysparkdistributed-computingrdd

How to group and add up in spark?


I have an RDD like this:

{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }

{"key1" : "fruit" , "key2" : "US" , "key3" : "2" }

{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }

{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }

{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }

My goal is to first group by key1 and then group by key2 and finally add key3.

I am expecting final result like,

key1          key2      key3
"fruit"     , "US"    , 3
"vegetable" , "US"    , 1
"fruit"     , "Japan" , 3
"vegetable" , "Japan" , 3

My code begins as below ,

rdd_arm = rdd_arm.map(lambda x: x[1])

rdd_arm includes the above key : value format.

I am not sure where to go next. Could some one help me out?


Solution

  • Let's create your RDD:

    In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" ,  "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }])
    In [2]: rdd_arm.collect()
    Out[2]: 
    [{'key1': 'fruit', 'key2': 'US', 'key3': '1'},
     {'key1': 'fruit', 'key2': 'US', 'key3': '2'},
     {'key1': 'vegetable', 'key2': 'US', 'key3': '1'},
     {'key1': 'fruit', 'key2': 'Japan', 'key3': '3'},
     {'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}]
    

    First, you have to create a new key, which will be the pair of key1 and key2. The value of it will be key3, so you want to do something like this:

    In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3']))
    
    In [4]: new_rdd.collect()
    Out[4]: 
    [('fruit, US', '1'),
     ('fruit, US', '2'),
     ('vegetable, US', '1'),
     ('fruit, Japan', '3'),
     ('vegetable, Japan', '3')]
    

    Then, we want to add the values of the keys that are duplicates, simply be calling reduceByKey(), like this:

    In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b))
    
    In [6]: new_rdd.collect()
    Out[6]: 
    [('fruit, US', 3),
     ('fruit, Japan', '3'),
     ('vegetable, US', '1'),
     ('vegetable, Japan', '3')]
    

    and we are done!


    Of course, this could be one-liner, like this:

    new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b))