RDD_Input = [(('377', '80'), ('1', '4')), (('377', '510'), ('1', '5')), (('377', '79'), ('1', '4')), (('377', '791'), ('1', '1')), (('377', '511'), ('1', '4')), (('377', '433'), ('1', '3')), (('377', '687'), ('1', '1')), (('377', '456'), ('1', '1')), (('377', '399'), ('1', '4')), (('377', '96'), ('1', '5')), (('377', '780'), ('1', '1')), (('377', '683'), ('1', '1')), (('377', '403'), ('1', '5')), (('377', '999'), ('1', '4')), (('377', '502'), ('1', '4')), (('377', '435'), ('1', '5')), (('377', '550'), ('1', '5')), (('377', '948'), ('1', '1')), (('377', '393'), ('1', '4')), (('377', '648'), ('1', '4'))]
The input RDD is in key-value pairs ((movie1, movie2), (rating1, rating2))
.
How do I transform the RDD into((movie1, movie2), (rating1, rating2), (rating3, rating4), (rating5, rating6), ...))
?
Expected result example: (('377', '399'), ('1', '4'), ('1', '4'))
('377', '399')
being the key and the following tuple index is appended based on the same key.
The requirement is to use purely RDD API.
It seems, you want to first groupByKey
putting values into a list and then just map
.
data = [(('a', 'b'), ('1', '4')), (('a', 'b'), ('3', '5')), (('c', 'd'), ('2', '2'))]
rdd = sc.parallelize(data)
rdd = rdd.groupByKey().mapValues(list)
rdd = rdd.map(lambda x: (x[0], *x[1]))
print(rdd.collect())
# [(('c', 'd'), ('2', '2')), (('a', 'b'), ('1', '4'), ('3', '5'))]