Search code examples
pythonapache-sparkpysparkrdd

Filter an rdd depending on values of a second rdd


I have two rdd's and I would like to filter one by the value of the other.

A few instances of each rdd are as follows:

rdd1 = [((address1, date1),1), ((address5, date2),1), ((address1, date2),1), ((address2,date3),1)]
rdd2 = [(address1,1), (address1,1), (address2, 1), (address1, 1)]

The desired output would be:

joined_rdd = [((address1, date1),1),((address1, date2),1),((address2,date3),1)]

So basically I want to keep the tuples in rdd1 if the value of address in that tuple exists in the rdd2.


Solution

  • Do a join and discard everything from rdd2:

    rdd1 = sc.parallelize([(('address1', 'date1'),1), (('address5', 'date2'),1), (('address1', 'date2'),1), (('address2','date3'),1)])
    rdd2 = sc.parallelize([('address1',1), ('address1',1), ('address2', 1), ('address1', 1)])
    
    result_rdd = (rdd1.keyBy(lambda x: x[0][0])
                      .join(rdd2.map(lambda x: x[0])
                                .keyBy(lambda x: x)
                                .distinct())
                      .map(lambda x: x[1][0]))
    
    result_rdd.collect()
    [(('address2', 'date3'), 1), (('address1', 'date1'), 1), (('address1', 'date2'), 1)]