Search code examples
sortingpysparkgroupingrddcounting

sort rdd by two values and get top 10 per group


Suppose I have the following RDD in pyspark, where each row is a list:

[foo, apple]
[foo, orange]
[foo, apple]
[foo, apple]
[foo, grape]
[foo, grape]
[foo, plum]
[bar, orange]
[bar, orange]
[bar, orange]
[bar, grape]
[bar, apple]
[bar, apple]
[bar, plum]
[scrog, apple]
[scrog, apple]
[scrog, orange]
[scrog, orange]
[scrog, grape]
[scrog, plum]

I would like to show the top 3 fruit (index 1) for each group (index 0), ordered by the count of fruit. Suppose for the sake of simplicity, not caring much about ties (e.g. scrog has count 1 for grape and plum; don't care which).

My goal is output like:

foo, apple, 3
foo, grape, 2
foo, orange, 1
bar, orange, 3
bar, apple, 2
bar, plum, 1   # <------- NOTE: could also be "grape" of count 1
scrog, orange, 2  # <---------- NOTE: "scrog" has many ties, which is okay
scrog, apple, 2
scrog, grape, 1

I can think of a likely inefficient approach:

  • get unique groups and .collect() as list
  • filter full rdd by group, count and sort fruits
  • use something like zipWithIndex() to get top 3 counts
  • save as new RDD with format (<group>, <fruit>, <count>)
  • union all RDDs at end

But I'm interested in not only more spark specific approaches, but ones that might skip expensive actions like collect() and zipWithIndex().

As a bonus -- but not required -- if I did want to apply sorting/filtering to address ties, where that might best be accomplished.

Any advice much appreciated!

UPDATE: in my context, unable to use dataframes; must use RDDs only.


Solution

  • map and reduceByKey operations in pyspark

    Sum the counts with .reduceByKey, group the groups with .groupByKey, select the top 3 of each group with .map and heapq.nlargest.

    rdd = sc.parallelize([
        ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
        ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
        ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
        ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
        ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
    ])
    
    from operator import add
    from heapq import nlargest
    
    n = 3
    
    results = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add) \
                 .map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey() \
                 .map(lambda x: (x[0], nlargest(n, x[1])))
    
    print( results.collect() )
    # [('bar', [(3, 'orange'), (2, 'apple'), (1, 'plum')]),
    #  ('scrog', [(2, 'orange'), (2, 'apple'), (1, 'plum')]),
    #  ('foo', [(3, 'apple'), (2, 'grape'), (1, 'plum')])]
    

    Standard python

    For comparison, if you have a simple python list instead of an rdd, the easiest way to do grouping in python is with dictionaries:

    data = [
        ["foo", "apple"], ["foo", "orange"], ["foo", "apple"], ["foo", "apple"],
        ["foo", "grape"], ["foo", "grape"], ["foo", "plum"], ["bar", "orange"],
        ["bar", "orange"], ["bar", "orange"], ["bar", "grape"], ["bar", "apple"],
        ["bar", "apple"], ["bar", "plum"], ["scrog", "apple"], ["scrog", "apple"],
        ["scrog", "orange"], ["scrog", "orange"], ["scrog", "grape"], ["scrog", "plum"]
    ]
    
    from heapq import nlargest
    from operator import itemgetter
    
    d = {}
    for k,v in data:
        d.setdefault(k, {})
        d[k][v] = d[k].get(v, 0) + 1
    print(d)
    # {'foo': {'apple': 3, 'orange': 1, 'grape': 2, 'plum': 1}, 'bar': {'orange': 3, 'grape': 1, 'apple': 2, 'plum': 1}, 'scrog': {'apple': 2, 'orange': 2, 'grape': 1, 'plum': 1}}
    
    n = 3
    results = [(k,v,c) for k,subd in d.items()
                       for v,c in nlargest(n, subd.items(), key=itemgetter(1))]
    print(results)
    # [('foo', 'apple', 3), ('foo', 'grape', 2), ('foo', 'orange', 1), ('bar', 'orange', 3), ('bar', 'apple', 2), ('bar', 'grape', 1), ('scrog', 'apple', 2), ('scrog', 'orange', 2), ('scrog', 'grape', 1)]