Search code examples
pythonapache-sparkpysparkaws-glue

Rewrite Pyspark function to join columns in json


I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:

C1 C2 CN
10 20 abc
99 cde
40 50

Should be transformed to this:

C1 C2 CN JSON
10 20 abc { "C1": 10, "C2": 20, "CN": "abc"}
30 def { "C1": 99, "CN": "cde"}
40 50 { "C1": 99, C2: 50}

The columns names and number may vary, so I can't pass it explicitly. The strategy I'm using is:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType

def jsonize_fields(row):
    vars = {}
    for k, v in row.asDict().items():
        if v:
            vars[k] = v
    return json.dumps(vars)

jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))

This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?


Solution

  • Just found it:

    from pyspark.sql.functions import to_json
    
    spark_data_frame = spark_data_frame.withColumn('JSON',
        to_json(struct(*spark_data_frame.columns)))
    

    By default, to_json ignore null values (it can be changed by using as second parameter options={"ignoreNullFields": False})), but not empty ones.
    If you want to ignore empty values also, put it before:

    from pyspark.sql.functions import col,when
    
    spark_data_frame = spark_data_frame.select(
        [when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])