Search code examples
dataframepysparkfunctools

Pyspark Dataframe add condition to `reduce(add,(F.col(x) ... `


Let us consider a dataframe df as follows:

df.show()
+-----+-----+-----+-----+-----+
|col A|val_1|val_2|val_3|val_4|
+-----+-----+-----+-----+-----+
|city1|  100|  100|  200|  100|
|city2|  200|  300|  300|  100|
|city1|  100|  100|  100|  100|
|city2|  500|  200|  200|  200|
+-----+-----+-----+-----+-----+

If I want to add values in columns val_i and put them in a new column sum, I can do the following:

from functools import reduce
from operator import add
val_cols = [x for x in df.columns if 'val' in x]
df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols)))).show()
+-----+-----+-----+-----+-----+----+
|col A|val_1|val_2|val_3|val_4| sum|
+-----+-----+-----+-----+-----+----+
|city1|  100|  100|  200|  100| 500|
|city2|  200|  300|  300|  100| 900|
|city1|  100|  100|  100|  100| 400|
|city2|  500|  200|  200|  200|1100|
+-----+-----+-----+-----+-----+----+

How can I add condition in (reduce(add,(F.col(x) ... argument? For example if I want to only include values more than 200. I tried this

df.withColumn('sum', (reduce(add,(F.col(x) for x in val_cols if F.col(x)>200)))).show()

but got the following error:

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.


Solution

  • This is possible by constructing the condition using f.when(...).otherwise(...) ahead:

    functools.reduce(
      operator.add, 
      [f.when(f.col(c) > 200, f.col(c)).otherwise(f.lit(0)) for c in df1.columns]
    )