I have the following schema in pyspark:
root
|-- id: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- seconds: decimal(38,18) (nullable = true)
|-- total_seconds: decimal(38,3) (nullable = true)
I have two pyspark dataframes which I need to join and aggregate together. Each data frame has the same schema.
Given the following input in one data frame:
[{
"id": 1,
"data": [{
"id": "123",
"name": "name1",
"seconds": 50
}, {
"id": "234",
"name": "name2",
"seconds": 25
}],
"total_seconds": 100
}, {
"id": 2,
"data": [{
"id": "123",
"name": "name1",
"seconds": 100
}, {
"id": "234",
"name": "name2",
"seconds": 200
}],
"total_seconds": 400
}]
In the second dataframe I have the following data:
[{
"id": 1,
"data": [{
"id": "123",
"name": "name1",
"seconds": 100
}, {
"id": "345",
"name": "name3",
"seconds": 25
}],
"total_seconds": 400
}, {
"id": 3,
"data": [{
"id": "123",
"name": "name1",
"seconds": 50
}, {
"id": "234",
"name": "name2",
"seconds": 100
}],
"total_seconds": 200
}]
I would then expect this output:
[{
"id": 1,
"data": [{
"id": "123",
"name": "name1",
"seconds": 150
}, {
"id": "234",
"name": "name2",
"seconds": 25
}, {
"id": "345",
"name": "name3",
"seconds": 25
}],
"total_seconds": 500
}, {
"id": 2,
"data": [{
"id": "123",
"name": "name1",
"seconds": 100
}, {
"id": "234",
"name": "name2",
"seconds": 200
}],
"total_seconds": 400
}, {
"id": 3,
"data": [{
"id": "123",
"name": "name1",
"seconds": 50
}, {
"id": "234",
"name": "name2",
"seconds": 100
}],
"total_seconds": 200
}]
Essentially, I need to do the following:
You can solve this by joining the 2 dataframes, explode() and groupBy() will help also to manipulate the data before and after the join, here's the tested code, you can add shows between the transformations if something is nor clear, or leave a comment below:
spark = SparkSession.builder.master("local[*]").getOrCreate()
df1 = spark.read.option("multiline", "true").json("json1.json")
df2 = spark.read.option("multiline", "true").json("json2.json") \
.withColumnRenamed("data", "data_2").withColumnRenamed("id", "id_2").withColumnRenamed("total_seconds", "total_seconds_2")
df1_exploded = df1.withColumn("data", explode(col("data")))
df2_exploded = df2.withColumn("data_2", explode(col("data_2"))).drop("total_seconds_2")
resultDf = df1_exploded.join(df2_exploded, (df1_exploded.id == df2_exploded.id_2) & (
df1_exploded.data.id == df2_exploded.data_2.id), "outer") \
.withColumn("id", coalesce(col("id"), col("id_2"))) \
.withColumn("data",
struct(coalesce(col("data.id"), col("data_2.id")), coalesce(col("data.name"), col("data_2.name")),
coalesce(col("data.seconds"), lit(0)) + coalesce(col("data_2.seconds"), lit(0)))) \
.select("data", "id", "total_seconds") \
.groupby("id").agg(collect_list("data").alias("data"))
total_seconds_df = df1.join(df2, df1.id == df2.id_2, "outer")\
.withColumn("id", coalesce(col("id"), col("id_2")))\
.withColumn("total_seconds", coalesce(col("total_seconds"), lit(0)) + coalesce(col("total_seconds_2"), lit(0)))\
.select("id", "total_seconds")
resultDf = resultDf.join(total_seconds_df, ["id"], "left")
resultDf.show()
Result:
+---+-------------------------------------------------------+-------------+
|id |data |total_seconds|
+---+-------------------------------------------------------+-------------+
|1 |[{123, name1, 150}, {234, name2, 25}, {345, name3, 25}]|500 |
|3 |[{123, name1, 50}, {234, name2, 100}] |200 |
|2 |[{123, name1, 100}, {234, name2, 200}] |400 |
+---+-------------------------------------------------------+-------------+