Search code examples
apache-sparkpysparkrdd

Reducing values in lists of (key, val) RDD's, given these lists are values in another list of (key, val) RDD's


I've being rolling my head for a while over this - would really appreciate any suggestions! Sorry for long title, I hope a short example I'll construct below will explain this much better.

Let's say we have an RDD of the below form:

data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
           (2,[('k3',1),('k3',8),('k1',6)])])
data.collect()

Output:

[(1, [('k1', 4), ('k2', 3), ('k1', 2)]),
 (2, [('k3', 1), ('k3', 8), ('k1', 6)])]

I am looking to do the following with the deepest list of (key,val) RDD's

.reduceByKey(lambda a, b: a + b)

(i.e. reduce the values of these RDD's by key to get the sum by key while retaining the result mapped with keys of the initial higher level RDD, which would produce the following output):

[(1, [('k1', 6), ('k2', 3)]),
 (2, [('k3', 9), ('k1', 6)])]

I'm relatively new with PySpark and probably missing something basic here, but I've tried a lot of different approaches on this, but essentially cannot find a way to access and reduceByKey the (key,val) RDD's in a list, which is itself a value of another RDD.

Many thanks in advance!

Denys


Solution

  • What you are trying to do is : your value (in input K,V) is an iterable on which you want to sum on inner key and return result as =>

    (outer_key(e.g 1,2) -> List(Inner_Key(E.g."K1","K2"),Summed_value))

    As you see the sum is calculated on inner Key-V, we can achieve this by

    First peeling out elements from each list item

    => making a new key as (outer key ,inner key)

    => making a sum on (outer_key,inner_key) -> value

    => Changing data format back to (outer_key ->(inner_key, summed_value))

    => finally Grouping again on Outer Key

    I am not sure about Python one but believe just replacing Scala collection syntax with python's would suffice and here is the solution

    SCALA VERSION

    scala> val keySeq = Seq((1,List(("K1",4),("K2",3),("K1",2))),
         | (2,List(("K3",1),("K3",8),("K1",6))))
    keySeq: Seq[(Int, List[(String, Int)])] = List((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))
    
    scala> val inRdd = sc.parallelize(keySeq)
    inRdd: org.apache.spark.rdd.RDD[(Int, List[(String, Int)])] = ParallelCollectionRDD[111] at parallelize at <console>:26
    
    scala> inRdd.take(10)
    res64: Array[(Int, List[(String, Int)])] = Array((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))
    
    
    // And solution :
    scala> inRdd.flatMap { case (i,l) => l.map(l => ((i,l._1),l._2)) }.reduceByKey(_+_).map(x => (x._1._1 ->(x._1._2,x._2))).groupByKey.map(x => (x._1,x._2.toList.sortBy(x =>x))).collect()
    
    // RESULT ::
    res65: Array[(Int, List[(String, Int)])] = Array((1,List((K1,6), (K2,3))), (2,List((K1,6), (K3,9))))
    

    UPDATE => Python Solution

    >>> data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
    ...            (2,[('k3',1),('k3',8),('k1',6)])])
    >>> data.collect()
    [(1, [('k1', 4), ('k2', 3), ('k1', 2)]), (2, [('k3', 1), ('k3', 8), ('k1', 6)])]
    
    # Similar operation
    
    >>> data.flatMap(lambda x : [ ((x[0],y[0]),y[1]) for y in x[1]]).reduceByKey(lambda a,b : (a+b)).map(lambda x : [x[0][0],(x[0][1],x[1])]).groupByKey().mapValues(list).collect()
    
    # RESULT 
    [(1, [('k1', 6), ('k2', 3)]), (2, [('k3', 9), ('k1', 6)])]