I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:
C1 | C2 | CN |
---|---|---|
10 | 20 | abc |
99 | cde | |
40 | 50 |
Should be transformed to this:
C1 | C2 | CN | JSON |
---|---|---|---|
10 | 20 | abc | { "C1": 10, "C2": 20, "CN": "abc"} |
30 | def | { "C1": 99, "CN": "cde"} | |
40 | 50 | { "C1": 99, C2: 50} |
The columns names and number may vary, so I can't pass it explicitly. The strategy I'm using is:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
def jsonize_fields(row):
vars = {}
for k, v in row.asDict().items():
if v:
vars[k] = v
return json.dumps(vars)
jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))
This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?
Just found it:
from pyspark.sql.functions import to_json
spark_data_frame = spark_data_frame.withColumn('JSON',
to_json(struct(*spark_data_frame.columns)))
By default, to_json
ignore null values (it can be changed by using as second parameter options={"ignoreNullFields": False})
), but not empty ones.
If you want to ignore empty values also, put it before:
from pyspark.sql.functions import col,when
spark_data_frame = spark_data_frame.select(
[when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])