Search code examples
pythonapache-sparkpysparkfilterrdd

How to filter RDD by attribute/key and then apply function using pyspark?


I have some example data:

my_data = [{'id': '001', 'name': 'Sam', 'class': "classA", 'age': 15, 'exam_score': 90},
           {'id': '002', 'name': 'Tom', 'class': "classA", 'age': 15, 'exam_score': 78},
           {'id': '003', 'name': 'Ben', 'class': "classB", 'age': 16, 'exam_score': 91},
           {'id': '004', 'name': 'Max', 'class': "classB", 'age': 16, 'exam_score': 76},
           {'id': '005', 'name': 'Ana', 'class': "classA", 'age': 15, 'exam_score': 88},
           {'id': '006', 'name': 'Ivy', 'class': "classA", 'age': 16, 'exam_score': 77},
           {'id': '007', 'name': 'Eva', 'class': "classB", 'age': 15, 'exam_score': 86},
           {'id': '008', 'name': 'Zoe', 'class': "classB", 'age': 16, 'exam_score': 89}]

my_rdd = sc.parallelize(my_data)
my_rdd

Let's say I have some simple function:

def divide_by_100(value):
  return value/100

The goal is to divide all exam scores by 100 with the function, and then find the minimum score. My idea is to:

  • filter my_rdd by key, such that that only values in exam_score remain
  • apply the divide_by_100() function to this
  • use the .min() and .collect() function to print the lowest exam score in the data

I'm aware that groupByKey() could also be used.

The problem is that I don't know how to put this into practice using pyspark. Would appreciate any help with this.


Solution

  • the minium score without any keys:

    score_rdd = my_rdd.map(lambda my_dict: my_dict['exam_score']/100)
    print(score_rdd.min())
    # 0.76
    

    this is a solution using pyspark rdd:

    score_rdd = my_rdd.map(lambda my_dict: (my_dict['class'], my_dict['exam_score']/100))
    print(score_rdd.collect())
    # [('classA', 0.9), ('classA', 0.78), ('classB', 0.91), ('classB', 0.76), ('classA', 0.88), ('classA', 0.77), ('classB', 0.86), ('classB', 0.89)]
    from builtins import min # import python function: min
    class_min_rdd = score_rdd.reduceByKey(min)
    print(class_min_rdd.collect())
    # [('classA', 0.77), ('classB', 0.76)]
    

    this is a solution using pyspark dataframe:

    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    
    df = my_rdd.toDF()
    df.printSchema()
    # root
    #  |-- age: long (nullable = true)
    #  |-- class: string (nullable = true)
    #  |-- exam_score: long (nullable = true)
    #  |-- id: string (nullable = true)
    #  |-- name: string (nullable = true)
    df = df.withColumn('score', col('exam_score')/100).groupBy('class').agg(min('score').alias('score'))
    df.show(10, False)
    # +------+-----+
    # |class |score|
    # +------+-----+
    # |classB|0.76 |
    # |classA|0.77 |
    # +------+-----+
    print(df.collect())
    # [Row(class='classB', score=0.76), Row(class='classA', score=0.77)]