Given two large RDDs, a
with a set of (key, value)
pairs and b
with only keys
, what would be the best way to join them such that a
keeps only those rows that match the keys of b
?
More specifically, this is what I want to do:
val a: RDD[(Int, Double)] = ...
val b: RDD[Int] = ...
val c: RDD[(Int, Double)] = a.myFilterJoin(b)
where c
contains only the rows of a
that match the keys in b
, and we can assume that a
contains only unique keys. Is there anything like myFilterJoin
available?
Note that if b
would be small enough, I could simply broadcast it as a set and then use it as a filter on b
. But let's assume b
is large enough for this broadcast to be prohibitively expensive.
What I usually do is to add a dummy variable to b
such that b
gets the form of (key, dummy)
to be able to do the join, and then I remove the dummy variable in a map. But this seems quite hacky and I was wondering if there's a better solution.
Similar to ShemTov's answer but preserving type safety by using Datasets instead of DataFrames.
(PS: I would recommend you to just use Datasets instead of RDDs)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val a = sc.parallelize(List((1 -> 0.0), (2 -> 3.3), (3 -> 5.5), (5 -> 10.11)))
val b = sc.parallelize(List(2, 3, 4, 5))
val c = b.toDS.joinWith(a.toDS, $"value" === $"_1", "inner").map {
case (_, (key, value)) => key -> value
}.rdd