I have 2 RDDs with the same key, but different value types (more than 2 values). I want to join these RDDs by key, and append their values next in the final tuple (see below). What's the best way to do this?
rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])
Desired Output RDD
[ (1, "Foo", "test1", [5,6,7]), (2, "Bar", "test2", [1,2,3]) ]
Doing a direct join does not work:
print(rdd2.join(rdd1).collect())
#[(1, ('Foo', 'test1')), (2, ('Bar', 'test2'))]
This ignores the the rest of the values in rdd1
and the output is in the wrong format.
You can use join
here, granted that you first map the rdds
into the form (key, value)
.
rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])
def map_to_kvp(row):
if len(row) < 3:
return row
return (row[0], tuple(row[1:]))
rdd3 = rdd2.map(map_to_kvp).join(rdd1.map(map_to_kvp))
print(rdd3.collect())
#[
# (1, ('Foo', ('test1', [5, 6, 7]))),
# (2, ('Bar', ('test2', [1, 2, 3])))
#]
Now you've got all of the data in the right places, but you just need to flatten the resulting rows.
In this case you'll have to write your own flatten
function to avoid flattening the string
and list
as well.
We can build upon this answer to How can I flatten lists without splitting strings? to make your own function:
def flatten(foo):
for x in foo:
if hasattr(x, '__iter__') and not isinstance(x, str) and not isinstance(x, list):
for y in flatten(x):
yield y
else:
yield x
rdd4 = rdd3.map(lambda row: tuple(flatten(row)))
print(rdd4.collect())
#[(1, 'Foo', 'test1', [5, 6, 7]), (2, 'Bar', 'test2', [1, 2, 3])]