Search code examples
jsonpysparkdatabricksazure-databricks

Using Pyspark to read JSON items from an array?


I'm having some issues with reading items from Cosmos DB in databricks, it seems to read the JSON as a string value, and having some issues getting the data out of it to columns.

I have a column called ProductRanges with the following values in a row:

[   {
        "name": "Red",
        "min": 0,
        "max": 99,
        "value": "Order More"
    },
    {
        "name": "Amber",
        "min": 100,
        "max": 499,
        "value": "Stock OK"
    },
    {
        "name": "Green",
        "min": 500,
        "max": 1000000,
        "value": "Overstocked"
    }
]

In Cosmos DB the JSON document is valid, out when importing the data the datatype in the dataframe is a string, not a JSON object/struct as I would expect.

I would like the be able to get count the number of times "name" comes up and iterate through them the get the min, max and value items, as the number of ranges that we can have can be more than 3. I've been though a few post on stackoverflow and other places but stuck on the formatting. I've tried to use the explode and read the schema based in the column values, but it does say 'in vaild document', think it may be due to Pyspark needing {} at the start and the end, but even concatenating that in the SQL query from cosmos db still ends up as the datatype of string.

Any pointers would be appreciated


Solution

  • I see you retrieved JSON documents from Azure CosmosDB and convert them to PySpark DataFrame, but the nested JSON document or array could not be transformed as a JSON object in a DataFrame column as you expected, because there is not a JSON type defined in pyspark.sql.types module, as below.

    enter image description here

    I searched a document PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame which be a suitable solution for your current case, even the same as you want, while I was trying to solve it.

    The document above shows how to use ArrayType, StructType, StructField and other base PySpark datatypes to convert a JSON string in a column to a combined datatype which can be processed easier in PySpark via define the column schema and an UDF.

    Here is the summary of sample code. Hope it helps.

    source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
    

    JSON is read into a data frame through sqlContext. The output is:

    +------+--------------------+
    
    |attr_1|              attr_2|
    
    +------+--------------------+
    
    |     1|[{"a":1,"b":1},{"...|
    
    |     2|[{"a":3,"b":3},{"...|
    
    +------+--------------------+
    
    
    root
      |-- attr_1: long (nullable = true)
      |-- attr_2: string (nullable = true)
    

    Then, to convert the attr_2 column via define column schema and UDF.

    # Function to convert JSON array string to a list
    import json
    
    def parse_json(array_str):
        json_obj = json.loads(array_str)
        for item in json_obj:
            yield (item["a"], item["b"])
    
    # Define the schema
    from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
    
    json_schema = ArrayType(StructType([StructField('a', IntegerType(
    ), nullable=False), StructField('b', IntegerType(), nullable=False)]))
    
    # Define udf
    from pyspark.sql.functions import udf
    
    udf_parse_json = udf(lambda str: parse_json(str), json_schema)
    
    # Generate a new data frame with the expected schema
    
    df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
    df_new.show()
    df_new.printSchema()
    

    The output is as the following:

    +------+--------------+
    
    |attr_1|        attr_2|
    
    +------+--------------+
    
    |     1|[[1,1], [2,2]]|
    
    |     2|[[3,3], [4,4]]|
    
    +------+--------------+
    
    
    root
      |-- attr_1: long (nullable = true)
      |-- attr_2: array (nullable = true)
      |    |-- element: struct (containsNull = true)
      |    |    |-- a: integer (nullable = false)
      |    |    |-- b: integer (nullable = false)