Search code examples
python-3.xapache-sparkpysparkrdd

How to create New Rdd with all possible combination of elements other Rdd in pyspark?


Hi i have created a Rdd like below

rdd1=sc.parallelize(['P','T','K'])
rdd1.collect()
['P', 'T', 'K']

Now I want to create new RDD2 with all possible combinations like below with new RDD.i.e.except same element combination like(p,p),(k,k),(t,t).

my expected output when i am doing

RDD2.collect()

[
    ('P'),('T'),('K'),
    ('P','T'),('P','K'),('T','K'),('T','P'),('K','P'),('K','T'),
    ('P','T','K'),('P','K','T'),('T','P','K'),('T','K','P'),('K','P','T'),('K','T','P')
]

Solution

  • It seems that you want to generate all permutations of the elements in your rdd where each row contains unique values.

    One way would be to first create a helper function to generate the desired combination of length n:

    from functools import reduce
    from itertools import chain
    
    def combinations_of_length_n(rdd, n):
        # for n > 0
        return reduce(
            lambda a, b: a.cartesian(b).map(lambda x: tuple(chain.from_iterable(x))),
            [rdd]*n
        ).filter(lambda x: len(set(x))==n)
    

    Essentially the function will do n Cartesian products of your rdd with itself and keep only the rows where all of the values are distinct.

    We can test this out for n = [2, 3]:

    print(combinations_of_length_n(rdd1, n=2).collect())
    #[('P', 'T'), ('P', 'K'), ('T', 'P'), ('K', 'P'), ('T', 'K'), ('K', 'T')]
    
    print(combinations_of_length_n(rdd1, n=3).collect())
    #[('P', 'T', 'K'),
    # ('P', 'K', 'T'),
    # ('T', 'P', 'K'),
    # ('K', 'P', 'T'),
    # ('T', 'K', 'P'),
    # ('K', 'T', 'P')]
    

    The final output that you want is just union of these intermediate results with the original rdd (with the values mapped to tuples).

    rdd1.map(lambda x: tuple((x,)))\
        .union(combinations_of_length_n(rdd1, 2))\
        .union(combinations_of_length_n(rdd1, 3)).collect()
    #[('P',),
    # ('T',),
    # ('K',),
    # ('P', 'T'),
    # ('P', 'K'),
    # ('T', 'P'),
    # ('K', 'P'),
    # ('T', 'K'),
    # ('K', 'T'),
    # ('P', 'T', 'K'),
    # ('P', 'K', 'T'),
    # ('T', 'P', 'K'),
    # ('K', 'P', 'T'),
    # ('T', 'K', 'P'),
    # ('K', 'T', 'P')]
    

    To generalize for any max number of repetitions:

    num_reps = 3
    reduce(
        lambda a, b: a.union(b),
        [
            combinations_of_length_n(rdd1.map(lambda x: tuple((x,))), i+1) 
            for i in range(num_reps)
        ]
    ).collect()
    #Same as above
    

    Note: Cartesian products are expensive operations and should be avoided when possible.