Search code examples
pythonapache-sparkpysparkrdd

Pyspark - Filter RDD With Dates in Broadcast Dictionary


I have a python dictionary that I broadcast which contains date filters by user.

nested_filter = {"user1":"2018-02-15"}
b_filter = sc.broadcast(nested_filter)

I want to use this broadcast variable to filter a larger RDD with rows less than the filter date.

rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])

rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()

But it returns an empty RDD.

Could someone please point out what I am doing incorrectly? Also, do I need to convert the string dates to a date object?

The correct result should be:

[("user1","2018-02-05")]

Solution

  • Observe that the value returned by b_filter.value.items() inside your filter call is the same as:

    nested_filter.items()
    #[('user1', '2018-02-15')]
    

    So then your comparison becomes:

    ("user1","2018-02-05") < [('user1', '2018-02-15')]
    #False
    

    Which is False. Assuming that nested_filter is a dictionary with just 1 item (as shown here), what you probably intended to do was compare to the first element of the list:

    ("user1","2018-02-05") < nested_filter.items()[0]
    #True
    

    So to "fix" your code, you could do the following:

    rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
    #[('user1', '2018-02-05')]
    

    But instead, what I think you actually want is the following:

    rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
    #[('user1', '2018-02-05')]
    

    This uses fields[0] to get the date from the nested_filter (or return None if it doesn't exist) and compares the value with fields[1].

    As you have noted, this comparison will happen lexicographically on the strings. This won't be a problem for you if your dates remain in YYYY-MM-DD format, but for other date formats you may need to convert to a datetime object.