Search code examples
mongodbapache-sparkpyspark

PySpark / Mongodb Dataframe to Nested Collection


I have a Pandas dataframe in the following format. Data has been pre-aggregated.

+---------------------------+----------+---------+-------------+-------------+
|InterfaceName              |StartDate |StartHour|DocumentCount|TotalRowCount|
+---------------------------+----------+---------+-------------+-------------+
|Interface_A                |2023-04-01|0        |5            |4384         |
|Interface_A                |2023-04-01|1        |58           |57168        |
|Interface_B                |2023-04-01|1        |1            |136          |
|Interface_C                |2023-04-01|1        |1            |131          |
|Interface_A                |2023-04-02|0        |58           |57168        |
|Interface_B                |2023-04-02|0        |1            |131          |
|Interface_C                |2023-04-02|0        |1            |136          |
|Interface_A                |2023-04-02|1        |2            |1657         |
|Interface_B                |2023-04-02|1        |2            |1539         |
|Interface_C                |2023-04-02|1        |2            |1657         |
+---------------------------+----------+---------+-------------+-------------+

Using PySpark, how can I transform the dataframe so that the schema appears as follows, then write to a structured collection in MongoDb?

root
 |-- StartDate: date (nullable = true)
 |-- StartHour: integer (nullable = true)
 |    |-- InterfaceSummary: struct (nullable = false)
 |    |    |-- InterfaceName: string (nullable = true)
 |    |    |-- DocumentCount: string (nullable = true)
 |    |    |-- TotalRowCount: string (nullable = true)

Thanks in advance,

Ben.


Solution

  • See the below implementation -
    (I have created the spark dataframe directly using the input data you've shared). But in order to explicitly create the spark dataframe from pandas dataframe you could use this code -
    df = spark.createDataFrame(pdf)
    Here pdf will be your pandas dataframe.
    )

    Input Data -

    from pyspark.sql.types import *
    
    schema = StructType([
        StructField("InterfaceName", StringType(), True),
        StructField("StartDate", StringType(), True),
        StructField("StartHour", IntegerType(), True),
        StructField("DocumentCount", IntegerType(), True),
        StructField("TotalRowCount", IntegerType(), True)
    ])
    
    data = [
        ("Interface_A", "2023-04-01", 0, 5, 4384),
        ("Interface_A", "2023-04-01", 1, 58, 57168),
        ("Interface_B", "2023-04-01", 1, 1, 136),
        ("Interface_C", "2023-04-01", 1, 1, 131),
        ("Interface_A", "2023-04-02", 0, 58, 57168),
        ("Interface_B", "2023-04-02", 0, 1, 131),
        ("Interface_C", "2023-04-02", 0, 1, 136),
        ("Interface_A", "2023-04-02", 1, 2, 1657),
        ("Interface_B", "2023-04-02", 1, 2, 1539),
        ("Interface_C", "2023-04-02", 1, 2, 1657)
    ]
    
    df = spark.createDataFrame(data, schema=schema)
    df.show(truncate=False)
    
    +-------------+----------+---------+-------------+-------------+
    |InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
    +-------------+----------+---------+-------------+-------------+
    |Interface_A  |2023-04-01|0        |5            |4384         |
    |Interface_A  |2023-04-01|1        |58           |57168        |
    |Interface_B  |2023-04-01|1        |1            |136          |
    |Interface_C  |2023-04-01|1        |1            |131          |
    |Interface_A  |2023-04-02|0        |58           |57168        |
    |Interface_B  |2023-04-02|0        |1            |131          |
    |Interface_C  |2023-04-02|0        |1            |136          |
    |Interface_A  |2023-04-02|1        |2            |1657         |
    |Interface_B  |2023-04-02|1        |2            |1539         |
    |Interface_C  |2023-04-02|1        |2            |1657         |
    +-------------+----------+---------+-------------+-------------+
    

    Transformed Schema -

    from pyspark.sql.functions import *
    
    df1 = df.select(
                    col("StartDate").cast("Date"),
                    col("StartHour").cast("Integer"),
                    struct(
                      col("InterfaceName"),
                      col("DocumentCount").cast("String").alias("DocumentCount"),
                      col("TotalRowCount").cast("String").alias("TotalRowCount")
                    ).alias("InterfaceSummary")
    )
    df1.show(truncate=False)
    df1.printSchema()
    
    +----------+---------+------------------------+
    |StartDate |StartHour|InterfaceSummary        |
    +----------+---------+------------------------+
    |2023-04-01|0        |{Interface_A, 5, 4384}  |
    |2023-04-01|1        |{Interface_A, 58, 57168}|
    |2023-04-01|1        |{Interface_B, 1, 136}   |
    |2023-04-01|1        |{Interface_C, 1, 131}   |
    |2023-04-02|0        |{Interface_A, 58, 57168}|
    |2023-04-02|0        |{Interface_B, 1, 131}   |
    |2023-04-02|0        |{Interface_C, 1, 136}   |
    |2023-04-02|1        |{Interface_A, 2, 1657}  |
    |2023-04-02|1        |{Interface_B, 2, 1539}  |
    |2023-04-02|1        |{Interface_C, 2, 1657}  |
    +----------+---------+------------------------+
    
    root
     |-- StartDate: date (nullable = true)
     |-- StartHour: integer (nullable = true)
     |-- InterfaceSummary: struct (nullable = false)
     |    |-- InterfaceName: string (nullable = true)
     |    |-- DocumentCount: string (nullable = true)
     |    |-- TotalRowCount: string (nullable = true)
    
    

    Once you have the transformed dataframe created. You can write it to your target mongodb collection somewhat as below -

    mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
    database_name = "<dbname>"
    collection_name = "<collectionname>"
    
    df.write.format("mongo") \
      .option("uri", mongo_uri) \
      .option("database", database_name) \
      .option("collection", collection_name) \
      .mode("append") \
      .save()