I'm having some issues with reading items from Cosmos DB in databricks, it seems to read the JSON as a string value, and having some issues getting the data out of it to columns.
I have a column called ProductRanges with the following values in a row:
[ {
"name": "Red",
"min": 0,
"max": 99,
"value": "Order More"
},
{
"name": "Amber",
"min": 100,
"max": 499,
"value": "Stock OK"
},
{
"name": "Green",
"min": 500,
"max": 1000000,
"value": "Overstocked"
}
]
In Cosmos DB the JSON document is valid, out when importing the data the datatype in the dataframe is a string, not a JSON object/struct as I would expect.
I would like the be able to get count the number of times "name" comes up and iterate through them the get the min, max and value items, as the number of ranges that we can have can be more than 3. I've been though a few post on stackoverflow and other places but stuck on the formatting. I've tried to use the explode and read the schema based in the column values, but it does say 'in vaild document', think it may be due to Pyspark needing {} at the start and the end, but even concatenating that in the SQL query from cosmos db still ends up as the datatype of string.
Any pointers would be appreciated
I see you retrieved JSON documents from Azure CosmosDB and convert them to PySpark DataFrame, but the nested JSON document or array could not be transformed as a JSON object in a DataFrame column as you expected, because there is not a JSON type defined in pyspark.sql.types
module, as below.
I searched a document PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame
which be a suitable solution for your current case, even the same as you want, while I was trying to solve it.
The document above shows how to use ArrayType
, StructType
, StructField
and other base PySpark datatypes to convert a JSON string in a column to a combined datatype which can be processed easier in PySpark via define the column schema and an UDF.
Here is the summary of sample code. Hope it helps.
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
JSON is read into a data frame through sqlContext. The output is:
+------+--------------------+
|attr_1| attr_2|
+------+--------------------+
| 1|[{"a":1,"b":1},{"...|
| 2|[{"a":3,"b":3},{"...|
+------+--------------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: string (nullable = true)
Then, to convert the attr_2
column via define column schema and UDF.
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
The output is as the following:
+------+--------------+
|attr_1| attr_2|
+------+--------------+
| 1|[[1,1], [2,2]]|
| 2|[[3,3], [4,4]]|
+------+--------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)