Search code examples
pythonapache-sparkpysparkrdd

Pyspark - how to filter RDD with Broadcast Dictionary on key AND value


I'm attempting to filter a large RDD based on a broadcast variable.

I was able to do the following which filters tuples that exist as keys in the broadcast variable.

nested_filter = {"india":'ind',"usa":'us'}
b_filter = sc.broadcast(nested_filter)


rdd_set = sc.parallelize([('india','ind'),('india', 'nope') , ('usa','us'),
                ('japan','jpn'),('uruguay','urg')])

Filter:

rdd_set.filter(lambda fields: fields[0] in b_filter.value).collect()

This returns:

[('india', 'ind'), ('india', 'nope'), ('usa', 'us')]

My issue is that I want to filter keys in the broadcast dictionary and also the value associated with the key.

The correct result should be:

[('india', 'ind'), ('usa', 'us')]

The actual RDD will have several billion lines of rows with a broadcast dictionary containing several millions keys. Can someone please show me the most efficient way to do this?


Solution

  • You can use items() to get a list of key-value tuples from your dictionary. Then check to see if a row is in that list:

    rdd_set.filter(lambda fields: fields in b_filter.value.items()).collect()
    #[('india', 'ind'), ('usa', 'us')]