Search code examples
pythonapache-sparkpysparkrdd

Joining two RDDs with multiple value components and flattening the result


I have 2 RDDs with the same key, but different value types (more than 2 values). I want to join these RDDs by key, and append their values next in the final tuple (see below). What's the best way to do this?

rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])

Desired Output RDD

[ (1, "Foo", "test1", [5,6,7]), (2, "Bar", "test2", [1,2,3]) ]

Doing a direct join does not work:

print(rdd2.join(rdd1).collect())
#[(1, ('Foo', 'test1')), (2, ('Bar', 'test2'))]

This ignores the the rest of the values in rdd1 and the output is in the wrong format.


Solution

  • You can use join here, granted that you first map the rdds into the form (key, value).

    rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
    rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])
    
    def map_to_kvp(row):
        if len(row) < 3:
            return row
        return (row[0], tuple(row[1:]))
    
    rdd3 = rdd2.map(map_to_kvp).join(rdd1.map(map_to_kvp))
    print(rdd3.collect())
    #[
    #    (1, ('Foo', ('test1', [5, 6, 7]))), 
    #    (2, ('Bar', ('test2', [1, 2, 3])))
    #]
    

    Now you've got all of the data in the right places, but you just need to flatten the resulting rows.

    In this case you'll have to write your own flatten function to avoid flattening the string and list as well.

    We can build upon this answer to How can I flatten lists without splitting strings? to make your own function:

    def flatten(foo):
        for x in foo:
            if hasattr(x, '__iter__') and not isinstance(x, str) and not isinstance(x, list):
                for y in flatten(x):
                    yield y
            else:
                yield x
    
    rdd4 = rdd3.map(lambda row: tuple(flatten(row)))
    print(rdd4.collect())
    #[(1, 'Foo', 'test1', [5, 6, 7]), (2, 'Bar', 'test2', [1, 2, 3])]