Search code examples
pysparkapache-spark-sqldatabricks

How to work with complex data type in Pyspark


I am having below issue when I have one dataframe and it is having one column name is attribute and attribute type is

|-- attributes: string (nullable = true)

in that column i am having values like below

[
    {'name': 'sfcc.created_by', 'type': 'STRING', 'value': 'DummyUser'},
    {'name': 'shippingLines', 'type': 'JSON', 'value': {'code': 'DUMMYCODE', 'price': '0', 'discountedPrice': '0'}},
    {'name': 'sourceChannel', 'type': 'STRING', 'value': 'dummyOS'},
    {'name': 'leaveAtDoor', 'type': 'BOOLEAN', 'value': False},
    {'name': 'actualCreateDate', 'type': 'STRING', 'value': '2024-01-01T00:00:00.000Z'},
    {'name': 'orderCompleteDate', 'type': 'STRING', 'value': '2024-01-02T00:00:00.000Z'},
    {'name': 'sfcc.order_date', 'type': 'STRING', 'value': '2024-01-01T00:00:00.000Z'},
    {'name': 'orderSuccessfullyCreatedInSAP', 'type': 'BOOLEAN', 'value': True},
    {'name': 'splitPayments', 'type': 'JSON', 'value': [{'price': '100', 'gateway': 'DUMMYGATEWAY', 'paymentGatewayName': 'DUMMY_CARD'}]},
    {'name': 'sfcc.customer_locale', 'type': 'STRING', 'value': 'en_US'},
    {'name': 'customerId', 'type': 'STRING', 'value': '000DummyID0000'},
    {'name': 'financialStatus', 'type': 'STRING', 'value': 'DUMMYSTATUS'},
    {'name': 'addressLine1', 'type': 'STRING', 'value': 'Dummy Street 1'},
    {'name': 'sourceLocation', 'type': 'STRING', 'value': 'DUMMYWEBSITE'},
    {'name': 'addressLine2', 'type': 'STRING', 'value': 'Dummy Street 2'},
    {'name': 'subtotalPrice', 'type': 'STRING', 'value': '100'},
    {'name': 'carrierId', 'type': 'STRING', 'value': 'DummyCarrier'},
    {'name': 'isCancelable', 'type': 'BOOLEAN', 'value': False},
    {'name': 'consignments', 'type': 'JSON', 'value': ['0000-0000-0000-0000']},
    {'name': 'sfcc.shipments', 'type': 'JSON', 'value': {'shipments': [{'items': [{'skuRef': '0000000000', 'quantity': 1}], 'isGift': False, 'shipmentId': '00000000', 'giftMessage': None, 'shippingAddress': {'area': 'Dummy Area', 'city': 'Dummy City', 'name': 'John Doe', 'phone': '+00000000000', 'street': 'Dummy Street', 'country': 'XX', 'lastName': 'Doe', 'postcode': '00000', 'firstName': 'John'}, 'shippingMethodRef': 'XXX000'}]}}
]

I want to get all result in separate columns like

sfcc.created_by  shippingLines
DummyUser        {'code': 'DUMMYCODE', 'price': '0', 'discountedPrice': '0'}


Solution

  • from pyspark.sql.functions import (
        regexp_replace,
        from_json,
        schema_of_json,
        map_from_entries,
        col,
        expr,
        explode,
    )
    
    # Create the dataframe.
    
    data = [
        (
            """[{"name": "sfcc.created_by", "type": "STRING", "value": "XXXXX"},
     {'name': 'shippingLines', 'type': 'JSON', 'value': {'code': 'XXXXX', 'price': '0', 'discountedPrice': '0'}}, 
     {'name': 'sourceChannel', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'leaveAtDoor', 'type': 'BOOLEAN', 'value': False}, 
     {'name': 'actualCreateDate', 'type': 'STRING', 'value': '2020-04-13T14:02:21.529Z'}, 
     {'name': 'orderCompleteDate', 'type': 'STRING', 'value': '2020-04-15T11:16:17.086Z'}, 
     {'name': 'sfcc.order_date', 'type': 'STRING', 'value': '2020-04-13T14:01:36.000Z'}, 
     {'name': 'orderSuccessfullyCreatedInSAP', 'type': 'BOOLEAN', 'value': True},
     {'name': 'splitPayments', 'type': 'JSON', 'value': [{'price': 'XXXXX', 'gateway': 'XXXXX', 'paymentGatewayName': 'XXXXX'}]}, 
     {'name': 'sfcc.customer_locale', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'customerId', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'financialStatus', 'type': 'STRING', 'value': 'XXXXX'},
     {'name': 'addressLine1', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'sourceLocation', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'addressLine2', 'type': 'STRING', 'value': 'XXXXX'},
     {'name': 'subtotalPrice', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'carrierId', 'type': 'STRING', 'value': 'XXXXX'}, 
     {'name': 'isCancelable', 'type': 'BOOLEAN', 'value': False}, 
     {'name': 'consignments', 'type': 'JSON', 'value': ['XXXXX']},
     {'name': 'sfcc.shipments', 'type': 'JSON', 'value': {'shipments': [{'items': [{'skuRef': 'XXXXX', 'quantity': 1}], 'isGift': False, 'shipmentId': 'XXXXX', 'giftMessage': None, 'shippingAddress': {'area': 'Other', 'city': 'XXXXX', 'name': 'XXXXX', 'phone': '+910000000', 'street': 'XXXXX', 'country': 'XXXXX', 'lastName': 'XXXXX', 'postcode': None, 'firstName': 'XXXXX'}, 'shippingMethodRef': 'XXXXX'}]}}    
     ]
    """,
        ),
    (
            """[{"name": "sfcc.created_by", "type": "STRING", "value": "AAAAA"},
     {'name': 'shippingLines', 'type': 'JSON', 'value': {'code': 'BBBBB', 'price': '0', 'discountedPrice': '0'}}, 
     {'name': 'sourceChannel', 'type': 'STRING', 'value': 'CCCC'}, 
     {'name': 'leaveAtDoor', 'type': 'BOOLEAN', 'value': False}, 
     {'name': 'actualCreateDate', 'type': 'STRING', 'value': '2020-04-13T14:02:21.529Z'}
     ]
    """,
        )    
    ]
    columns = ["attributes"]
    
    df = spark.createDataFrame(data, schema=columns)
    

    EDIT: Sample data is not a valid JSON, hence it must be cleaned up first. The rest of the code works as is.

    df = (
        df.withColumn("attributes", regexp_replace("attributes", "False", "false"))
          .withColumn("attributes", regexp_replace("attributes", "True", "true"))
          .withColumn("attributes", regexp_replace("attributes", "None", "null"))
    )
    

    Assuming that your data is in the dataframe df, you can parse the data into a Map column:

    # Parse the string data as a JSON and then into a MAP type.
    df = (
        df.withColumn(
            "attributes",
            from_json(
                col("attributes"), schema_of_json(df.select(col("attributes")).first()[0])
            ),
        )
        .withColumn(
            "attributes", expr("transform(attributes, x -> struct(x.name, x.value))")
        )
        .withColumn("attributes", map_from_entries("attributes"))
    )
    

    Once you have the data in a Map type column, you can extract the keys and then select the keys as individual columns:

    # Extract all possible keys from the dataframe.
    keys = (
        df.select(explode("attributes"))
        .select("key")
        .distinct()
        .rdd.flatMap(lambda x: x)
        .collect()
    )
    
    # Create the desired dataframe.
    df = df.select(*[col("attributes").getItem(k).alias(k) for k in keys])
    df.show()
    

    Result:

    +--------------------+------------+---------+--------------------+-------------+-----------------------------+--------------------+--------------------+--------------------+------------+---------------+--------------+--------------------+----------+---------------+--------------------+-------------+------------+-----------+------------+
    |sfcc.customer_locale|isCancelable|carrierId|   orderCompleteDate|subtotalPrice|orderSuccessfullyCreatedInSAP|    actualCreateDate|     sfcc.order_date|       splitPayments|consignments|financialStatus|sourceLocation|      sfcc.shipments|customerId|sfcc.created_by|       shippingLines|sourceChannel|addressLine2|leaveAtDoor|addressLine1|
    +--------------------+------------+---------+--------------------+-------------+-----------------------------+--------------------+--------------------+--------------------+------------+---------------+--------------+--------------------+----------+---------------+--------------------+-------------+------------+-----------+------------+
    |               XXXXX|       false|    XXXXX|2020-04-15T11:16:...|        XXXXX|                         true|2020-04-13T14:02:...|2020-04-13T14:01:...|[{"price":"XXXXX"...|   ["XXXXX"]|          XXXXX|         XXXXX|{"shipments":[{"i...|     XXXXX|          XXXXX|{"code":"XXXXX","...|        XXXXX|       XXXXX|      false|       XXXXX|
    |                null|        null|     null|                null|         null|                         null|2020-04-13T14:02:...|                null|                null|        null|           null|          null|                null|      null|          AAAAA|{"code":"BBBBB","...|         CCCC|        null|      false|        null|
    +--------------------+------------+---------+--------------------+-------------+-----------------------------+--------------------+--------------------+--------------------+------------+---------------+--------------+--------------------+----------+---------------+--------------------+-------------+------------+-----------+------------+