Search code examples
scalaapache-sparkrdd

Filtering RDDs based on value of Key


I have two RDDs that wrap the following arrays:

Array((3,Ken), (5,Jonny), (4,Adam), (3,Ben), (6,Rhonda), (5,Johny))

Array((4,Rudy), (7,Micheal), (5,Peter), (5,Shawn), (5,Aaron), (7,Gilbert))

I need to design a code in such a way that if I provide input as 3 I need to return

Array((3,Ken), (3,Ben))

If input is 6, output should be

Array((6,Rhonda))

I tried something like this:

val list3 = list1.union(list2)

list3.reduceByKey(_+_).collect   

list3.reduceByKey(6).collect 

None of these worked, can anyone help me out with a solution for this problem?


Solution

  • Given the following that you would have to define yourself

    // Provide you SparkContext and inputs here
    val sc: SparkContext = ???
    val array1: Array[(Int, String)] = ???
    val array2: Array[(Int, String)] = ???
    val n: Int = ???
    
    val rdd1 = sc.parallelize(array1)
    val rdd2 = sc.parallelize(array2)
    

    You can use the union and filter to reach your goal

    rdd1.union(rdd2).filter(_._1 == n)
    

    Since filtering by key is something that you would probably want to do in several occasions, it makes sense to encapsulate this functionality in its own function.

    It would also be interesting if we could make sure that this function could work on any type of keys, not only Ints.

    You can express this in the old RDD API as follows:

    def filterByKey[K, V](rdd: RDD[(K, V)], k: K): RDD[(K, V)] =
      rdd.filter(_._1 == k)
    

    You can use it as follows:

    val rdd = rdd1.union(rdd2)
    
    val filtered = filterByKey(rdd, n)
    

    Let's look at this method a little bit more in detail.

    This method allows to filterByKey and RDD which contains a generic pair, where the type of the first item is K and the type of the second type is V (from key and value). It also accepts a key of type K that will be used to filter the RDD.

    You then use the filter function, that takes a predicate (a function that goes from some type - in this case K - to a Boolean) and makes sure that the resulting RDD only contains items that respect this predicate.

    We could also have written the body of the function as:

    rdd.filter(pair => pair._1 == k)
    

    or

    rdd.filter { case (key, value) => key == k }
    

    but we took advantage of the _ wildcard to express the fact that we want to act on the first (and only) parameter of this anonymous function.

    To use it, you first parallelize your RDDs, call union on them and then invoke the filterByKey function with the number you want to filter by (as shown in the example).