Search code examples
apache-sparkpysparkapache-spark-sqlapache-spark-mllib

PySpark replace less frequent items with most frequent items


I have a categorical column in a data frame which has some levels, and now I would like to replace those less frequent levels (which have frequencies in terms of percentage of total less than a specified percentage) with the most frequent level. How would I realize that in an elegant and compact way?

Below is an example, if I set the specified frequency as 0.3, then level "c" should be replaced by level "a" since it's frequency is only 1/6 which is below 0.3.

from pyspark.sql import Row

row = Row("foo")

df = sc.parallelize([ row("a"), row("b"), row("c"), row("a"), row("a"), row("b") ]).toDF()

Solution

  • from pyspark.sql import Row
    import pyspark.sql.functions as f
    
    #sample data
    row = Row("foo")
    df = sc.parallelize([ row("a"), row("b"), row("c"), row("a"), row("a"), row("b") ]).toDF()
    
    df_temp = df.groupBy('foo').agg((f.count(f.lit(1))/df.count()).alias("frequency"))
    most_frequent_foo = df_temp.sort(f.col('frequency').desc()).select('foo').first()[0]
    df_temp = df_temp.withColumn('foo_replaced',
                                 f.when(f.col("frequency") < 0.3, f.lit(most_frequent_foo)).otherwise(f.col('foo')))
    
    df_final = df.join(df_temp, df.foo==df_temp.foo, 'left').drop(df_temp.foo).drop("frequency")
    df_final.show()
    

    Output is:

    +---+------------+
    |foo|foo_replaced|
    +---+------------+
    |  c|           a|
    |  b|           b|
    |  b|           b|
    |  a|           a|
    |  a|           a|
    |  a|           a|
    +---+------------+