Search code examples
apache-sparkpysparkrdd

Pyspark set on list of tuples


I am relatively new to Pyspark. I have an RDD made up of a list of tuples. I would like to call a function on the RDD that does the equivalent of the python set() function that gets rid of any duplicates.

I am also trying to understand this theoretically on a distributed system. How does a set function work if the RDD is spread out across multiple workers? How does it determine what is a duplicate or not if it is working with a simple RDD and not a Pair RDD?

Given an input RDD of unique tuples, where the elements in the tuples need to be unique i.e. order does not matter.

Input:

myTup = [('cat', 'dog'), , ('mouse', 'duck'), ('duck', 'cat'), ('cat', 'dog'), ('dog', 'cat'), ('dog', 'horse'), ('cat', 'duck'), ('dog', 'horse'), ('dog', 'horse')]

I would like to something equivalent to:

tuple_fix = list(set([tuple(sorted(t)) for t in my_Tup ]))

And get the output:

[('cat', 'dog'), ('mouse', 'duck'), ('duck', 'cat'), ('dog', 'horse')]

Thanks for taking the time!


Solution

  • Here's a high level explanation that hopefully explains how this can work1 in a distributed system.

    First make an rdd out of myTup:

    rdd = sc.parallelize(myTup)
    print(rdd.collect())
    #[('cat', 'dog'),
    # ('mouse', 'duck'),
    # ('duck', 'cat'),
    # ('cat', 'dog'),
    # ('dog', 'cat'),
    # ('dog', 'horse'),
    # ('cat', 'duck'),
    # ('dog', 'horse'),
    # ('dog', 'horse')]
    

    Each tuple can be sorted independently. Each worker can grab a subset of the rows and sort- this is pretty straightforward.

    sorted_rdd = rdd.map(lambda t: tuple(sorted(t)))
    print(sorted_rdd.collect())
    #[('cat', 'dog'),
    # ('duck', 'mouse'),
    # ('cat', 'duck'),
    # ('cat', 'dog'),
    # ('cat', 'dog'),
    # ('dog', 'horse'),
    # ('cat', 'duck'),
    # ('dog', 'horse'),
    # ('dog', 'horse')]
    

    In order to get the distinct elements from sorted_rdd, you can use distinct(). The way this can be done in a distributed way is through hashing. A hashing algorithm is used to decide which worker (reducer) gets each row. This will split up the data among your executors while ensuring that all duplicates will be sent to the same machine.

    Finally each machine just emits the distinct tuples from the data that it has been sent.

    print(sorted_rdd.distinct().collect())
    #[('cat', 'dog'), ('duck', 'mouse'), ('dog', 'horse'), ('cat', 'duck')]
    

    Notes:

    1: I am not certain that this is exactly how this is implemented, but it is one way to do it.