I'm trying to transform raw "Event" data coming from a Google Analytics account using PySpark. Each "Event" record has a field called "event_params", which contains sub-fields in the form of key-value pairs. Here's an example record:
| event_date | event_timestamp | event_name | event_params
| -------- | -------- | -------- | ----------------------|
| 20230207 | 1675797300185610| Page View | [{key=engaged_session_event, value={string_value=null,
int_value=1, float_value=null, double_value=null}},
{key=ga_session_number, value={string_value=null,
int_value=1, float_value=null, double_value=null}},
{key=page_title, value={string_value=BlahBlah,
double_value=null}}] |
I'd like the final dataset to look something like this:
event_date | event_timestamp | event_name | engaged_session_event | ga_session_number | page_title |
---|---|---|---|---|---|
20230207 | 1675797300185610 | Page View | 1 | 1 | BlahBlah |
I've tried converting the "event_params" field to ArrayType, StructType, and a JSON string, but I'm unable to even extract the individual "key" fields. Once I'm able to do that, I will need to disregard the null "values", and pivot the remaining key-value pairs as new fields.
Try this:
Importing necessary packages
from pyspark.sql.functions import col, regexp_replace, split, regexp_extract, from_json, struct, coalesce, max
df = df \
.withColumn("event_params", regexp_replace("event_params", "\[", "")) \
.withColumn("event_params", regexp_replace("event_params", "\]", "")) \
.withColumn("event_params", regexp_replace("event_params", "\}},", "}}|")) \
.withColumn("event_params", split("event_params", "\|")) \
.withColumn("event_params", explode("event_params"))
df = df \
.withColumn("event_params", regexp_replace("event_params", " ", "")) \
.withColumn("event_params", regexp_replace("event_params", "=", ":")) \
.withColumn("event_params", regexp_replace("event_params", ",", "',")) \
.withColumn("event_params", regexp_replace("event_params", "\{", "{'")) \
.withColumn("event_params", regexp_replace("event_params", "\}}", "'}}")) \
.withColumn("event_params", regexp_replace("event_params", ",", ",'")) \
.withColumn("event_params", regexp_replace("event_params", ":", "':'")) \
.withColumn("event_params", regexp_replace("event_params", ":'\{", ":{")) \
.withColumn("event_params", regexp_replace("event_params", "'null'", "null"))
event_params
df = df \
.withColumn("event_params", from_json("event_params", MapType(StringType(), StringType()))) \
.withColumn("event_params_key", col("event_params").getField("key")) \
.withColumn("event_params_value", col("event_params").getField("value")) \
.withColumn("event_params_value", from_json("event_params_value", MapType(StringType(), StringType()))) \
df = df.withColumn("event_params_value", coalesce(
"event_params_value.string_value",
"event_params_value.int_value",
"event_params_value.float_value",
"event_params_value.double_value",
))
df = df.groupBy(["event_date", "event_timestamp", "event_name"]).pivot("event_params_key").agg(
max("event_params_value")
)
df.show(truncate=False)
Output:
+----------+----------------+----------+---------------------+-----------------+----------+
|event_date|event_timestamp |event_name|engaged_session_event|ga_session_number|page_title|
+----------+----------------+----------+---------------------+-----------------+----------+
|20230207 |1675797300185610|Page View |1 |1 |BlahBlah |
+----------+----------------+----------+---------------------+-----------------+----------+
Note: I am not very good in writing regexp
, If anyone can do so feel free to reformat the code (2).