Search code examples
apache-sparkpysparkrdd

Pyspark; Using ReduceByKey on list values


I am trying to get a better understanding of the reduceByKey function and have been exploring ways of using it to complete different tasks. I would like to apply the the RDD data shown below. The format for a row of data is a tuple with a name and then a list of all the dates associated to that name (Below is a copy of how the data looks)

data = [("Cassavetes, Frank", ['2012', '2002', '2009', '2005']),
("Knight, Shirley (I)", ['1997', '2002', '2009']),
("Yip, Françoise", ['2007', '2004', '2000']),
("Danner, Blythe", ['2000', '2008', '2012', '2010', '2004', '2004', '1999', '1998']),
("Buck (X)", ['2002', '2006', '2009'])]

In an attempt to get a count of all the dates associated to each name in the tuples, I applied the code below, using the reduceByKey function to try and convert the list of dates into a sum of the number of dates in the list.

rdd = spark.sparkContext.parallelize(data)
reducedRdd = rdd.reduceByKey( lambda a,b: len(a.split(" ")) + len(b.split(" ")) )
reducedRdd.take(1)

The code above produces the same results as the input data and does not carry out any of the transformations listed in the reduce function, below is an example of the code's output:

[('Yip, Françoise', ['2007', '2004', '2000'])]

The output I expected is as follows;

[("Yip, Françoise", 3)]

Why is it that the code I wrote above does not give me my expected output and how would I alter it to ensure it does?


Solution

  • You're looking for a map, not a reduceByKey. There is nothing to reduce, because your data is already grouped by key, so nothing is done on your RDD and you got back the original RDD.

    rdd2 = rdd.map(lambda x: (x[0], len(x[1])))
    
    print(rdd2.collect())
    # [('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Yip, Françoise', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]
    

    mapValues might be even more appropriate:

    rdd2 = rdd.mapValues(len)
    
    print(rdd2.collect())
    # [('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Yip, Françoise', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]
    

    If you want to use reduceByKey, your data should be ungrouped. e.g. if you have

    data = [('Cassavetes, Frank', '2012'), ('Cassavetes, Frank', '2002'), ('Cassavetes, Frank', '2009'), ('Cassavetes, Frank', '2005'), ('Knight, Shirley (I)', '1997'), ('Knight, Shirley (I)', '2002'), ('Knight, Shirley (I)', '2009'), ('Yip, Françoise', '2007'), ('Yip, Françoise', '2004'), ('Yip, Françoise', '2000'), ('Danner, Blythe', '2000'), ('Danner, Blythe', '2008'), ('Danner, Blythe', '2012'), ('Danner, Blythe', '2010'), ('Danner, Blythe', '2004'), ('Danner, Blythe', '2004'), ('Danner, Blythe', '1999'), ('Danner, Blythe', '1998'), ('Buck (X)', '2002'), ('Buck (X)', '2006'), ('Buck (X)', '2009')]
    

    then you can do

    rdd = sc.parallelize(data)
    
    from operator import add
    rdd2 = rdd.map(lambda x: (x[0], 1)).reduceByKey(add)
    
    rdd2.collect()
    # [('Yip, Françoise', 3), ('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]