Search code examples
pythonapache-sparkrddset-difference

Perform Set Difference on RDDs in Spark Python


I have two spark RDDs, A has 301,500,000 rows and B has 1,500,000 rows. Those 1.5 million rows in B all appear in A as well. I would like the set difference between those two RDDs, such that I return A with 300,000,000 rows, with those 1,500,000 rows from B no longer present in A.

I cannot use Spark DataFrames.

Here is the system I am using right now. These RDDs have primary keys. What I do below is create a (collected) list of those primary keys that appear in B, then iterate through the primary keys of A to find those which do not appear in the list of B primary keys.

a = sc.parallelize([[0,"foo",'a'],[1,'bar','b'],[2,'mix','c'],[3,'hem', 'd'],[4,'line','e']])
b = sc.parallelize([[1,'bar','b'],[2,'mix','c']])
b_primary_keys = b.map(lambda x: x[0]).collect()  # since first col = primary key


def sep_a_and_b(row):
    primary_key = row[0]
    if(primary_key not in b_primary_keys):
        return(row)


a_minus_b = a.map(lambda x: sep_a_and_b(x)).filter(lambda x: x != None)

Now, this works in this sample problem because A and B are tiny. However, this is unsuccessful when I use my true datasets A and B. Is there a better (more parallel) way to implement this?


Solution

  • This seems like something you can solve with a subtractByKey

    val filteredA = a.subtractByKey(b)
    

    To change to a key value:

    val keyValRDD = rdd.map(lambda x: (x[:1],x[1:]))
    

    *Note that my python is weak and there might be better ways to split the values