Search code examples
apache-sparkdataframepysparkapache-spark-sqlrdd

Calculate per row and add new column in DataFrame PySpark - better solution?


I work with Data Frame in PySpark I have the following task: check how many "times" values from each column was > 2 for all columns. For u1 it is 0, for u2 => 2 and etc

user    a   b   c   d   times
   u1   1   0   1   0   0
   u2   0   1   4   3   2
   u3   2   1   7   0   1

My solution below. It works, I'm not sure that it is the best way and didn't try on real big data yet. I don't like transform to rdd and back to data frame. Is there anything better? I thouth in the beginning to claculate by UDF per columns, but didn't find a way to accamulte and sum all results per row:

def calculate_times(row):
    times = 0
    for index, item in enumerate(row):
        if not isinstance(item, basestring):
           if item > 2:
             times = times+1

return times    

def add_column(pair):
    return dict(pair[0].asDict().items() + [("is_outlier", pair[1])])   

def calculate_times_for_all(df): 
    rdd_with_times = df.map(lambda row: (calculate_times(row))
    rdd_final = df.rdd.zip(rdd_with_times).map(add_column)

    df_final = sqlContext.createDataFrame(rdd_final)
    return  df_final

for this solution i used this topic How do you add a numpy.array as a new column to a pyspark.SQL DataFrame?

Thanks!


Solution

  • It is just a simple one-liner. Example data:

    df = sc.parallelize([
        ("u1", 1, 0, 1, 0), ("u2", 0, 1, 4, 3), ("u3", 2, 1, 7, 0)
    ]).toDF(["user", "a", "b", "c", "d"])
    

    withColumn:

    df.withColumn("times", sum((df[c] > 2).cast("int") for c in df.columns[1:]))
    

    and the result:

    +----+---+---+---+---+-----+
    |user|  a|  b|  c|  d|times|
    +----+---+---+---+---+-----+
    |  u1|  1|  0|  1|  0|    0|
    |  u2|  0|  1|  4|  3|    2|
    |  u3|  2|  1|  7|  0|    1|
    +----+---+---+---+---+-----+
    

    Note:

    It columns are nullable you should correct for that, for example using coalesce:

    from pyspark.sql.functions import coalesce
    
    sum(coalesce((df[c] > 2).cast("int"), 0) for c in df.columns[1:])