I have read a csv file into pyspark dataframe
.
Now if I apply conditions in when()
clause, it works fine when the conditions are given before runtime
.
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col
sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb
s_df = sql_sc.createDataFrame(pandas_df)
new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
2))
new_df.show(truncate=False)
But I need to dynamically form the conditions inside when clause from a list.
[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]
Is there any way to achieve this?
Thanks in advance.
You can do the following:
pyspark.sql.functions.expr
to make a pyspark.sql.column.Column
out of it.For your example, something like this should work:
Given s_df
's schema:
root
|-- col1: long (nullable = false)
|-- value: string (nullable = false)
Importing functions and instantiate your conditions collection:
[...]
from pyspark.sql.functions import col, expr, when
conditions = [
{'column': 'col1', 'operator': '==', 'value': 3},
{'column': 'value', 'operator': '==', 'value': "'aa'"}
]
new_df = s_df.withColumn('value', expr(
f"IF({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
"value, 2)")).show()
when
function.new_df = s_df.withColumn('value',when(
expr(
f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
),
col("value")).otherwise(2)).show()