Search code examples
arrayspysparkapache-spark-sqlgoogle-analyticspivot

How To Unnest And Pivot Multiple JSON-like Structures Inside A PySpark DataFrame


I'm trying to transform raw "Event" data coming from a Google Analytics account using PySpark. Each "Event" record has a field called "event_params", which contains sub-fields in the form of key-value pairs. Here's an example record:


| event_date | event_timestamp | event_name | event_params                  
| --------   | --------        | --------   | ----------------------|
| 20230207   | 1675797300185610| Page View  | [{key=engaged_session_event, value={string_value=null,        
                                               int_value=1, float_value=null, double_value=null}},
                                               {key=ga_session_number, value={string_value=null, 
                                               int_value=1, float_value=null, double_value=null}}, 
                                               {key=page_title, value={string_value=BlahBlah,
                                               double_value=null}}]                                |

I'd like the final dataset to look something like this:

event_date event_timestamp event_name engaged_session_event ga_session_number page_title
20230207 1675797300185610 Page View 1 1 BlahBlah

I've tried converting the "event_params" field to ArrayType, StructType, and a JSON string, but I'm unable to even extract the individual "key" fields. Once I'm able to do that, I will need to disregard the null "values", and pivot the remaining key-value pairs as new fields.


Solution

  • Try this:

    Importing necessary packages

    from pyspark.sql.functions import col, regexp_replace, split, regexp_extract, from_json, struct, coalesce, max
    
    1. Splitting each JSON string values into records
    df = df \
        .withColumn("event_params", regexp_replace("event_params", "\[", "")) \
        .withColumn("event_params", regexp_replace("event_params", "\]", "")) \
        .withColumn("event_params", regexp_replace("event_params", "\}},", "}}|")) \
        .withColumn("event_params", split("event_params", "\|")) \
        .withColumn("event_params", explode("event_params"))
    
    1. Pharsing JSON string values
    df = df \
        .withColumn("event_params", regexp_replace("event_params", " ", "")) \
        .withColumn("event_params", regexp_replace("event_params", "=", ":")) \
        .withColumn("event_params", regexp_replace("event_params", ",", "',")) \
        .withColumn("event_params", regexp_replace("event_params", "\{", "{'")) \
        .withColumn("event_params", regexp_replace("event_params", "\}}", "'}}")) \
        .withColumn("event_params", regexp_replace("event_params", ",", ",'")) \
        .withColumn("event_params", regexp_replace("event_params", ":", "':'")) \
        .withColumn("event_params", regexp_replace("event_params", ":'\{", ":{")) \
        .withColumn("event_params", regexp_replace("event_params", "'null'", "null"))
    
    1. Convert and extract values from event_params
    df = df \
        .withColumn("event_params", from_json("event_params", MapType(StringType(), StringType()))) \
        .withColumn("event_params_key", col("event_params").getField("key")) \
        .withColumn("event_params_value", col("event_params").getField("value")) \
        .withColumn("event_params_value", from_json("event_params_value", MapType(StringType(), StringType()))) \
    
    1. To get pivot output
    df = df.withColumn("event_params_value", coalesce(
        "event_params_value.string_value",
        "event_params_value.int_value",
        "event_params_value.float_value",
        "event_params_value.double_value",
    ))
    
    df = df.groupBy(["event_date", "event_timestamp", "event_name"]).pivot("event_params_key").agg(
        max("event_params_value")
    )
    
    df.show(truncate=False)
    

    Output:

    +----------+----------------+----------+---------------------+-----------------+----------+
    |event_date|event_timestamp |event_name|engaged_session_event|ga_session_number|page_title|
    +----------+----------------+----------+---------------------+-----------------+----------+
    |20230207  |1675797300185610|Page View |1                    |1                |BlahBlah  |
    +----------+----------------+----------+---------------------+-----------------+----------+
    

    Note: I am not very good in writing regexp, If anyone can do so feel free to reformat the code (2).