I have a python dictionary that I broadcast which contains date filters by user.
nested_filter = {"user1":"2018-02-15"}
b_filter = sc.broadcast(nested_filter)
I want to use this broadcast variable to filter a larger RDD with rows less than the filter date.
rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])
rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()
But it returns an empty RDD.
Could someone please point out what I am doing incorrectly? Also, do I need to convert the string dates to a date object?
The correct result should be:
[("user1","2018-02-05")]
Observe that the value returned by b_filter.value.items()
inside your filter
call is the same as:
nested_filter.items()
#[('user1', '2018-02-15')]
So then your comparison becomes:
("user1","2018-02-05") < [('user1', '2018-02-15')]
#False
Which is False
. Assuming that nested_filter
is a dictionary with just 1 item (as shown here), what you probably intended to do was compare to the first element of the list:
("user1","2018-02-05") < nested_filter.items()[0]
#True
So to "fix" your code, you could do the following:
rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
#[('user1', '2018-02-05')]
But instead, what I think you actually want is the following:
rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
#[('user1', '2018-02-05')]
This uses fields[0]
to get the date from the nested_filter
(or return None
if it doesn't exist) and compares the value with fields[1]
.
As you have noted, this comparison will happen lexicographically on the strings. This won't be a problem for you if your dates remain in YYYY-MM-DD
format, but for other date formats you may need to convert to a datetime
object.