Search code examples
pythonapache-sparkpysparkapache-spark-sqlpyspark-schema

Convert each key value pair to columns of dataframe in pyspark


I have following array of map and I want to convert it into array of structs to convert all key value pairs to columns of a dataframe

-- DurationPeriod: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

Expected structure

|-- transform_col: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: string (nullable = true)

Here is the sample data what I will get as array of map. Each map may be empty or we may have more than 2 key value paris. it veries for each occurance.

[{eod -> 2023-06-14, Id -> 123456789}, {eod -> 2028-11-17, Id -> 123456788}]

I am trying to convert map to struct as map is not supporting to convert key, value pairs to columns. Please suggest if there is any solution directly how map key value pairs can be converted to columns on dataframe

I tried with following code, but it is not getting key values and its showing null

df1 = table.select("*",expr("transform(DurationPeriod, x -> struct(x.key as key,x.value as value))").alias("transform_col"))

[{null, null}, {null, null}]

Highly appreciated for your help to resolve this issue. Please let me know what I am missing here.

Update

Here is the actual conversion, I am looking for:

enter image description here


Solution

  • from pyspark.sql import SparkSession
    from pyspark.sql.types import (StructField, StructType, 
        StringType, MapType, ArrayType
    )
    from pyspark.sql.functions import explode
    
    # Define the schema
    schema = StructType([
        StructField('properties', ArrayType(MapType(StringType(), StringType())), True)
    ])
    
    # Initialize SparkSession
    spark = SparkSession.builder.appName('SparkExample').getOrCreate()
    
    # Create DataFrame
    dataDictionary = [
        ([{"eod": "2023-06-14", "Id": "123456789"},
           {"eod": "2023-05-14", "Id": "23456789"},
           {"eod": "2023-04-14", "Id": "456789y"}],),
        ([{"eod": "2022-06-14", "Id": "123"},
           {"eod": "2022-05-14", "Id": "6789"}],),
        ([{"eod": "2022-06-14"}],),
        ([],)
    ]
    
    df = spark.createDataFrame(data=dataDictionary, schema=schema)
    df.show(truncate=False)
    
    df = df.select(explode("properties").alias("properties"))
    df = df.select(df['properties.eod'].alias('eod'), df['properties.Id'].alias('Id'))
    df.show()
    
    

    Original Data:

    +---------------------------------------------------------------------------------------------------------------+
    |properties                                                                                                     |
    +---------------------------------------------------------------------------------------------------------------+
    |[{Id -> 123456789, eod -> 2023-06-14}, {Id -> 23456789, eod -> 2023-05-14}, {Id -> 456789y, eod -> 2023-04-14}]|
    |[{Id -> 123, eod -> 2022-06-14}, {Id -> 6789, eod -> 2022-05-14}]                                              |
    |[{eod -> 2022-06-14}]                                                                                          |
    |[]                                                                                                             |
    +---------------------------------------------------------------------------------------------------------------+
    

    Transformed Output:

    +----------+---------+
    |       eod|       Id|
    +----------+---------+
    |2023-06-14|123456789|
    |2023-05-14| 23456789|
    |2023-04-14|  456789y|
    |2022-06-14|      123|
    |2022-05-14|     6789|
    |2022-06-14|     null|
    +----------+---------+
    

    Not sure if I'm missing something but is this what you're looking for?