I work with Data Frame in PySpark I have the following task: check how many "times" values from each column was > 2 for all columns. For u1 it is 0, for u2 => 2 and etc
user a b c d times
u1 1 0 1 0 0
u2 0 1 4 3 2
u3 2 1 7 0 1
My solution below. It works, I'm not sure that it is the best way and didn't try on real big data yet. I don't like transform to rdd and back to data frame. Is there anything better? I thouth in the beginning to claculate by UDF per columns, but didn't find a way to accamulte and sum all results per row:
def calculate_times(row):
times = 0
for index, item in enumerate(row):
if not isinstance(item, basestring):
if item > 2:
times = times+1
return times
def add_column(pair):
return dict(pair[0].asDict().items() + [("is_outlier", pair[1])])
def calculate_times_for_all(df):
rdd_with_times = df.map(lambda row: (calculate_times(row))
rdd_final = df.rdd.zip(rdd_with_times).map(add_column)
df_final = sqlContext.createDataFrame(rdd_final)
return df_final
for this solution i used this topic How do you add a numpy.array as a new column to a pyspark.SQL DataFrame?
Thanks!
It is just a simple one-liner. Example data:
df = sc.parallelize([
("u1", 1, 0, 1, 0), ("u2", 0, 1, 4, 3), ("u3", 2, 1, 7, 0)
]).toDF(["user", "a", "b", "c", "d"])
withColumn
:
df.withColumn("times", sum((df[c] > 2).cast("int") for c in df.columns[1:]))
and the result:
+----+---+---+---+---+-----+
|user| a| b| c| d|times|
+----+---+---+---+---+-----+
| u1| 1| 0| 1| 0| 0|
| u2| 0| 1| 4| 3| 2|
| u3| 2| 1| 7| 0| 1|
+----+---+---+---+---+-----+
Note:
It columns are nullable
you should correct for that, for example using coalesce
:
from pyspark.sql.functions import coalesce
sum(coalesce((df[c] > 2).cast("int"), 0) for c in df.columns[1:])