Search code examples
apache-sparkpysparkcountrddreduce

Row count based on second column in RDD?


Here is an example of RDD

joinRDD = (productdict.join(result))
# [('B000002KXA', ('Music', ['AVM91SKZ9M58T', "This remix and single version of Madonna's song Resume Me [Immaculate Collection Album] is one of Madonna's best. Not only does it show the true ability of Madonna's vocal ability but the power this song brings to your heart. Madonna's voice in this single is unlike any other song she has sang, beautifully put together and mastered. A song everyone remembers from Madonna's long list of credits. This CD is one not to miss either you love Madonna or love-to-hate her, you must have it, a collection item!!! END", '5.0', 97]))]

Due to the (), I'm not sure on how to do a count based on second column ('Music').

Doing the following does not work:

joinRDD2 = joinRDD.mapValues(lambda x: (x[1], 1)).reduceByKey(add)

Solution

  • The thing is, your second "column" is not ('Music'), it's actually...
    ('Music', ['AVM91SKZ9M58T', "This remix ...", '5.0', 97])

    x[1] for you returns this whole column, while it seems you need only 'Music' which is the first element in that column. So, instead of x[1] you should use x[1][0].

    Example RDD:

    from operator import add
    
    joinRDD = sc.parallelize([
        ('B000002KXA', ('Music', ['AVM91SKZ9M58T', "This remix", 4])),
        ('B000002KXA', ('Music', ['AVM91SKZ9M58T', "This remix", 4])),
        ('B000002KXA', ('Drama', ['AVM91SKZ9M58T', "This remix", 4])),
    ])
    

    Test:

    joinRDD2 = joinRDD.map(lambda x: (x[1][0], 1)).reduceByKey(add)
    
    joinRDD2.collect()
    # [('Drama', 1), ('Music', 2)]