Search code examples
pythonpysparkrdd

How to get distinct dicts with nested list of RDD in Pyspark?


I have a similar question to: How can I get a distinct RDD of dicts in PySpark?

However, there is some difference. I have a dict with a key as string and a list as value in the following shape:

{"link0":["link1","link2",...]}.

So that in each of my RDD partitions dicts are stored.

The collect function gives me back a list of them:

[{"link0":["link1","link2",...]}, {"link1":["link2","link3",...]}, ...]

Assuming for example in partition one of my RDD I store:

[{"link0":["link1","link2"]}, {"link1":["link2","link3"]}] and

in partition two:

[{"link0":["link1","link2"]}, {"link3":["link4","link5"]}]

What I actually want to do is to get all distinct dicts over the RDD, same as in the question above:

[{"link0":["link1","link2"]}, {"link1":["link2","link3"]}, 
{"link3":["link4","link5"]}] 

Yet, when it comes to the list in the values I struggle how to cope with that. Do you have any recommendations how to handle it?

I tried to apply the dict_to_string() method mentioned, but are not sure if that is really the right way to handle this .

Also i thought about changing the data structure afterall to a better one.

Do you have any ideas what might fit better for my purpose?

After I got all the distinct key:[] pairs I want to get / filter out all the unique links in the list in all dicts except of those who are already as key in a dict, and subsequently store them in a new list:

["link2", "link4", "link5"]

If you have any idea, i'd be happy to hear!

Constructive help appreciated. Thanks.


Solution

  • As in comment: the dicts always contain a single key and a list as value. you can try the following approach:

    rdd = sc.parallelize([
        {"link0":["link1","link2"]}, {"link1":["link2","link3"]},
        {"link0":["link1","link2"]}, {"link3":["link4","link5"]}])
    

    Task-1: find unique RDD elements:

    use flatMap to convert the dict to a tuple with the value-part from list to tuple so that the RDD elements are hashable, take distinct() and then map the RDD elements back to their original data structure:

    rdd.flatMap(lambda x: [ (k,tuple(v)) for k,v in x.items() ]) \
       .distinct() \
       .map(lambda x: {x[0]:list(x[1])}) \
       .collect()
    #[{'link0': ['link1', 'link2']},
    # {'link1': ['link2', 'link3']},
    # {'link3': ['link4', 'link5']}]
    

    Task-2: find unique links in values but excluded from keys of dictionaries:

    retrieve all unique keys into rdd1 and unique values to rdd2 and then do rdd2.subtract(rdd1)

    rdd1 = rdd.flatMap(lambda x: x.keys()).distinct()
    # ['link0', 'link1', 'link3']
    
    rdd2 = rdd.flatMap(lambda x: [ v for vs in x.values() for v in vs ]).distinct()
    # ['link1', 'link2', 'link3', 'link4', 'link5']
    
    rdd2.subtract(rdd1).collect()
    # ['link2', 'link5', 'link4']