I'm attempting to filter a large RDD based on a broadcast variable.
I was able to do the following which filters tuples that exist as keys in the broadcast variable.
nested_filter = {"india":'ind',"usa":'us'}
b_filter = sc.broadcast(nested_filter)
rdd_set = sc.parallelize([('india','ind'),('india', 'nope') , ('usa','us'),
('japan','jpn'),('uruguay','urg')])
Filter:
rdd_set.filter(lambda fields: fields[0] in b_filter.value).collect()
This returns:
[('india', 'ind'), ('india', 'nope'), ('usa', 'us')]
My issue is that I want to filter keys in the broadcast dictionary and also the value associated with the key.
The correct result should be:
[('india', 'ind'), ('usa', 'us')]
The actual RDD will have several billion lines of rows with a broadcast dictionary containing several millions keys. Can someone please show me the most efficient way to do this?
You can use items()
to get a list of key-value tuples from your dictionary. Then check to see if a row is in that list:
rdd_set.filter(lambda fields: fields in b_filter.value.items()).collect()
#[('india', 'ind'), ('usa', 'us')]