I have a Pandas dataframe in the following format. Data has been pre-aggregated.
+---------------------------+----------+---------+-------------+-------------+
|InterfaceName |StartDate |StartHour|DocumentCount|TotalRowCount|
+---------------------------+----------+---------+-------------+-------------+
|Interface_A |2023-04-01|0 |5 |4384 |
|Interface_A |2023-04-01|1 |58 |57168 |
|Interface_B |2023-04-01|1 |1 |136 |
|Interface_C |2023-04-01|1 |1 |131 |
|Interface_A |2023-04-02|0 |58 |57168 |
|Interface_B |2023-04-02|0 |1 |131 |
|Interface_C |2023-04-02|0 |1 |136 |
|Interface_A |2023-04-02|1 |2 |1657 |
|Interface_B |2023-04-02|1 |2 |1539 |
|Interface_C |2023-04-02|1 |2 |1657 |
+---------------------------+----------+---------+-------------+-------------+
Using PySpark, how can I transform the dataframe so that the schema appears as follows, then write to a structured collection in MongoDb?
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
| |-- InterfaceSummary: struct (nullable = false)
| | |-- InterfaceName: string (nullable = true)
| | |-- DocumentCount: string (nullable = true)
| | |-- TotalRowCount: string (nullable = true)
Thanks in advance,
Ben.
See the below implementation -
(I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas
dataframe you could use this code -
df = spark.createDataFrame(pdf)
Here pdf
will be your pandas
dataframe.
)
Input Data -
from pyspark.sql.types import *
schema = StructType([
StructField("InterfaceName", StringType(), True),
StructField("StartDate", StringType(), True),
StructField("StartHour", IntegerType(), True),
StructField("DocumentCount", IntegerType(), True),
StructField("TotalRowCount", IntegerType(), True)
])
data = [
("Interface_A", "2023-04-01", 0, 5, 4384),
("Interface_A", "2023-04-01", 1, 58, 57168),
("Interface_B", "2023-04-01", 1, 1, 136),
("Interface_C", "2023-04-01", 1, 1, 131),
("Interface_A", "2023-04-02", 0, 58, 57168),
("Interface_B", "2023-04-02", 0, 1, 131),
("Interface_C", "2023-04-02", 0, 1, 136),
("Interface_A", "2023-04-02", 1, 2, 1657),
("Interface_B", "2023-04-02", 1, 2, 1539),
("Interface_C", "2023-04-02", 1, 2, 1657)
]
df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)
+-------------+----------+---------+-------------+-------------+
|InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
+-------------+----------+---------+-------------+-------------+
|Interface_A |2023-04-01|0 |5 |4384 |
|Interface_A |2023-04-01|1 |58 |57168 |
|Interface_B |2023-04-01|1 |1 |136 |
|Interface_C |2023-04-01|1 |1 |131 |
|Interface_A |2023-04-02|0 |58 |57168 |
|Interface_B |2023-04-02|0 |1 |131 |
|Interface_C |2023-04-02|0 |1 |136 |
|Interface_A |2023-04-02|1 |2 |1657 |
|Interface_B |2023-04-02|1 |2 |1539 |
|Interface_C |2023-04-02|1 |2 |1657 |
+-------------+----------+---------+-------------+-------------+
Transformed Schema -
from pyspark.sql.functions import *
df1 = df.select(
col("StartDate").cast("Date"),
col("StartHour").cast("Integer"),
struct(
col("InterfaceName"),
col("DocumentCount").cast("String").alias("DocumentCount"),
col("TotalRowCount").cast("String").alias("TotalRowCount")
).alias("InterfaceSummary")
)
df1.show(truncate=False)
df1.printSchema()
+----------+---------+------------------------+
|StartDate |StartHour|InterfaceSummary |
+----------+---------+------------------------+
|2023-04-01|0 |{Interface_A, 5, 4384} |
|2023-04-01|1 |{Interface_A, 58, 57168}|
|2023-04-01|1 |{Interface_B, 1, 136} |
|2023-04-01|1 |{Interface_C, 1, 131} |
|2023-04-02|0 |{Interface_A, 58, 57168}|
|2023-04-02|0 |{Interface_B, 1, 131} |
|2023-04-02|0 |{Interface_C, 1, 136} |
|2023-04-02|1 |{Interface_A, 2, 1657} |
|2023-04-02|1 |{Interface_B, 2, 1539} |
|2023-04-02|1 |{Interface_C, 2, 1657} |
+----------+---------+------------------------+
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
|-- InterfaceSummary: struct (nullable = false)
| |-- InterfaceName: string (nullable = true)
| |-- DocumentCount: string (nullable = true)
| |-- TotalRowCount: string (nullable = true)
Once you have the transformed dataframe created. You can write it to your target mongodb collection somewhat as below -
mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
database_name = "<dbname>"
collection_name = "<collectionname>"
df.write.format("mongo") \
.option("uri", mongo_uri) \
.option("database", database_name) \
.option("collection", collection_name) \
.mode("append") \
.save()