Search code examples
apache-sparkpysparkapache-spark-sqlrddkey-value

Group and merge RDD pair keys and values


RDD_Input =  [(('377', '80'), ('1', '4')), (('377', '510'), ('1', '5')), (('377', '79'), ('1', '4')), (('377', '791'), ('1', '1')), (('377', '511'), ('1', '4')), (('377', '433'), ('1', '3')), (('377', '687'), ('1', '1')), (('377', '456'), ('1', '1')), (('377', '399'), ('1', '4')), (('377', '96'), ('1', '5')), (('377', '780'), ('1', '1')), (('377', '683'), ('1', '1')), (('377', '403'), ('1', '5')), (('377', '999'), ('1', '4')), (('377', '502'), ('1', '4')), (('377', '435'), ('1', '5')), (('377', '550'), ('1', '5')), (('377', '948'), ('1', '1')), (('377', '393'), ('1', '4')), (('377', '648'), ('1', '4'))]

The input RDD is in key-value pairs ((movie1, movie2), (rating1, rating2)).

How do I transform the RDD into((movie1, movie2), (rating1, rating2), (rating3, rating4), (rating5, rating6), ...))?

Expected result example: (('377', '399'), ('1', '4'), ('1', '4'))

('377', '399') being the key and the following tuple index is appended based on the same key.

The requirement is to use purely RDD API.


Solution

  • It seems, you want to first groupByKey putting values into a list and then just map.

    data = [(('a', 'b'), ('1', '4')), (('a', 'b'), ('3', '5')), (('c', 'd'), ('2', '2'))]
    rdd = sc.parallelize(data)
    
    rdd = rdd.groupByKey().mapValues(list)
    rdd = rdd.map(lambda x: (x[0], *x[1]))
    
    print(rdd.collect())
    # [(('c', 'd'), ('2', '2')), (('a', 'b'), ('1', '4'), ('3', '5'))]