Search code examples
pysparklogical-operatorsexpr

Apply logical operation on a dataframe in pyspark


I have a dataframe with many columns and in one of the columns I have the logical operation which I need to perform on the dataframe. As an example look at the dataframe below enter image description here

I need to perform the logical operation defined in the column logical operation on the relevant rows

In a normal scenario i am able to use expr(). But in this case when i want to read it from a column and then apply, it gives me an error saying column is not iterable.

Any suggestions?


Solution

  • You can use the standard Python eval function inside of an UDF.

    The eval function expects the data to be in a dict, so we transform the data columns into a struct first:

    from pyspark.sql import functions as F
    
    eval_udf = F.udf(lambda op, data: eval(op, {}, data.asDict()))
    
    df.withColumn('data', F.struct([df[x] for x in df.columns if x != 'logical_operation'])) \
        .withColumn('result', eval_udf(F.col('logical_operation'), F.col('data'))) \
        .show()
    

    Output:

    +---+---+---+-----------------+---------+------+
    |  A|  B|  C|logical_operation|     data|result|
    +---+---+---+-----------------+---------+------+
    |  0|  1|  1|            (A&B)|{0, 1, 1}|     0|
    |  1|  1|  1|              (A)|{1, 1, 1}|     1|
    |  0|  0|  1|            (A|C)|{0, 0, 1}|     1|
    +---+---+---+-----------------+---------+------+
    

    eval comes with some security concerns so please check if this could be a problem for you!