Search code examples
pythonapache-sparkpysparkexplode

PySpark - Explode columns into rows and set values based on logic


Given a Dataframe:

+---+-----------+---------+-------+------------+
| id|      score|tx_amount|isValid|    greeting|
+---+-----------+---------+-------+------------+
|  1|        0.2|    23.78|   true| hello_world|
|  2|        0.6|    12.41|  false|byebye_world|
+---+-----------+---------+-------+------------+

I want to explode these columns into a Row named "col_value". This part is fine, but I also want to apply logic to each row, so that I get a result like below:

+---+------------+--------+---------+----------+-------+
| id|   col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
|  1|         0.2|       Y|        N|         N|      N|
|  1|       23.78|       N|        Y|         N|      N|
|  1|        true|       N|        N|         Y|      N|
|  1| hello_world|       N|        N|         N|      Y|
|  2|         0.6|       Y|        N|         N|      N|
|  2|       12.41|       N|        Y|         N|      N|
|  2|       false|       N|        N|         Y|      N|
|  2|byebye_world|       N|        N|         N|      Y|
+---+------------+--------+---------+----------+-------+

What I have so far:

.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
        .select("id", F.col("cols.*")) \
        .withColumnRenamed("0", "col_value") \
        .withColumn("is_score", F.lit("Y") if col1_type == "score" else F.lit("N")) \
        .withColumn("is_amount", F.lit("Y") if col2_type == "amount" else F.lit("N")) \
        .withColumn("is_boolean", F.lit("Y") if col3_type == "boolean" else F.lit("N")) \
        .withColumn("is_text", F.lit("Y") if col4_type == "text" else F.lit("N")) \
        .show()

But it gives the wrong output, as it gives the same results for every column:

+---+------------+--------+---------+----------+-------+
| id|   col_value|is_score|is_amount|is_boolean|is_text|
+---+------------+--------+---------+----------+-------+
|  1|         0.2|       Y|        Y|         Y|      Y|
|  1|       23.78|       Y|        Y|         Y|      Y|
|  1|        true|       Y|        Y|         Y|      Y|
|  1| hello_world|       Y|        Y|         Y|      Y|
|  2|         0.6|       Y|        Y|         Y|      Y|
|  2|       12.41|       Y|        Y|         Y|      Y|
|  2|       false|       Y|        Y|         Y|      Y|
|  2|byebye_world|       Y|        Y|         Y|      Y|
+---+------------+--------+---------+----------+-------+

How can I do this after the explode to get the correct results?


Solution

  • I think what you want can be achieved by applying regex on your col_value to determine if it is text,boolean,amount or score. The code below works as long as score never exceeds 1.0, and amount is always above 1.0. If that is not the case, let me know I will update logic.

    from pyspark.sql import functions as F
    df.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) \
            .select("id", F.col("cols.*")) \
            .withColumnRenamed("0", "col_value")\
            .withColumn("text", (F.regexp_extract(F.col("col_value"),"([A-Za-z]+)",1)))\
            .withColumn("boolean", F.when((F.col("text")=='true')|(F.col("text")=='false'),F.col("text")).otherwise(F.lit("")))\
            .withColumn("text", F.when(F.col("text")==F.col("boolean"), F.lit("")).otherwise(F.col("text")))\
            .withColumn("numeric", F.regexp_extract(F.col("col_value"),"([0-9]+)",1))\
            .withColumn("is_text", F.when(F.col("text")!="", F.lit("Y")).otherwise(F.lit("N")))\
            .withColumn("is_score", F.when(F.col("numeric")<=1, F.lit("Y")).otherwise(F.lit("N")))\
            .withColumn("is_amount", F.when(F.col("numeric")>1, F.lit("Y")).otherwise(F.lit("N")))\
            .withColumn("is_boolean", F.when(F.col("boolean")!="", F.lit("Y")).otherwise(F.lit("N")))\
            .select("id", "col_value","is_score","is_amount","is_boolean","is_text").show()
    
    
    +---+------------+--------+---------+----------+-------+
    | id|   col_value|is_score|is_amount|is_boolean|is_text|
    +---+------------+--------+---------+----------+-------+
    |  1|         0.2|       Y|        N|         N|      N|
    |  1|       23.78|       N|        Y|         N|      N|
    |  1|        true|       N|        N|         Y|      N|
    |  1| hello_world|       N|        N|         N|      Y|
    |  2|         0.6|       Y|        N|         N|      N|
    |  2|       12.41|       N|        Y|         N|      N|
    |  2|       false|       N|        N|         Y|      N|
    |  2|byebye_world|       N|        N|         N|      Y|
    +---+------------+--------+---------+----------+-------+