Search code examples
scalaapache-sparkrdd

Join two RDDs, one of which has only keys and no values


Given two large RDDs, a with a set of (key, value) pairs and b with only keys, what would be the best way to join them such that a keeps only those rows that match the keys of b?

More specifically, this is what I want to do:

val a: RDD[(Int, Double)] = ...
val b: RDD[Int] = ...
val c: RDD[(Int, Double)] = a.myFilterJoin(b)

where c contains only the rows of a that match the keys in b, and we can assume that a contains only unique keys. Is there anything like myFilterJoin available?

Note that if b would be small enough, I could simply broadcast it as a set and then use it as a filter on b. But let's assume b is large enough for this broadcast to be prohibitively expensive.

What I usually do is to add a dummy variable to b such that b gets the form of (key, dummy) to be able to do the join, and then I remove the dummy variable in a map. But this seems quite hacky and I was wondering if there's a better solution.


Solution

  • Similar to ShemTov's answer but preserving type safety by using Datasets instead of DataFrames.
    (PS: I would recommend you to just use Datasets instead of RDDs)

    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._
    
    val a = sc.parallelize(List((1 -> 0.0), (2 -> 3.3), (3 -> 5.5), (5 -> 10.11)))
    val b = sc.parallelize(List(2, 3, 4, 5))
    
    val c = b.toDS.joinWith(a.toDS, $"value" === $"_1", "inner").map {
      case (_, (key, value)) => key -> value
    }.rdd