Search code examples
apache-sparkpysparkrdd

ReduceByKey for two columns and count rows RDD


My data consists of four columns, the chart type, the name of a song, the position of the song in the chart and the day that the song had the specific position in the chart. How can I find the total days of each song in the first place of the chart? I want my result to look like: chart_type, song, days in #1

First I filter the chart position and I keep only #1. What should I do next? ReduceByKey for song, then reduce for chart type and then count records to find the total days in #1 for each song in each chart type?

('top200', '501', '1', '2021-03-26T00:00:00.000+02:00')
('top200', '501', '1', '2021-03-27T00:00:00.000+02:00')
('top200', '501', '1', '2021-03-28T00:00:00.000+02:00')
('viral50', 'Gowtu', '1', '2017-03-17T00:00:00.000+02:00')
('viral50', 'Gowtu', '1', '2017-03-18T00:00:00.000+02:00')
('viral50', 'Gowtu', '1', '2017-03-19T00:00:00.000+02:00')
('top200', 'Lonely (with benny blanco)', '1', '2020-11-09T00:00:00.000+02:00')
('top200', 'Lonely (with benny blanco)', '1', '2020-11-10T00:00:00.000+02:00')
('top200', 'Lonely (with benny blanco)', '1', '2020-11-11T00:00:00.000+02:00')

Thank you


Solution

  • If you want to do this using rdd and a count is required grouped by the first 2 elements, we can do the following.

    data_ls = [
        ('top200', '501', '1', '2021-03-26T00:00:00.000+02:00'),
        ('top200', '501', '1', '2021-03-27T00:00:00.000+02:00'),
        ('top200', '501', '1', '2021-03-28T00:00:00.000+02:00'),
        ('viral50', 'Gowtu', '1', '2017-03-17T00:00:00.000+02:00'),
        ('viral50', 'Gowtu', '1', '2017-03-18T00:00:00.000+02:00'),
        ('viral50', 'Gowtu', '1', '2017-03-19T00:00:00.000+02:00'),
        ('top200', 'Lonely (with benny blanco)', '1', '2020-11-09T00:00:00.000+02:00'),
        ('top200', 'Lonely (with benny blanco)', '1', '2020-11-10T00:00:00.000+02:00'),
        ('top200', 'Lonely (with benny blanco)', '1', '2020-11-11T00:00:00.000+02:00')
    ]
    
    data_rdd = spark.sparkContext.parallelize(data_ls)
    
    from operator import add
    
    data_rdd. \
        map(lambda gk: ((gk[0], gk[1]), 1)). \
        reduceByKey(add). \
        collect()
    
    # [(('top200', '501'), 3),
    #  (('top200', 'Lonely (with benny blanco)'), 3),
    #  (('viral50', 'Gowtu'), 3)]