Search code examples
pythonjsonpysparkazure-cosmosdbazure-databricks

Read json load to pyspark dataframe


I want to ingest data from azure cosmos db, I am using the python sdk for connection in databricks.

I want to be able to save my json.load(data) into a pyspark dataframe as I need to save the data in databricks delta lake, how can I read this data to pyspark dataframe. Below is my code and sample data

{
 "appUuid": "aaaa-bbbb-cccc",
 "SystemId": null,
 "city": "Lancaster",
 "state": "NY",
 "zipCode": "140",
 "field1": "others",
 "field2": "others"
}
{
 "appUuid": "bbbb-dddd-eeee",
 "SystemId": null,
 "city": "Alden ",
 "state": "NY",
 "zipCode": "140",
 "field1": "others",
 "field2": "others"
}
from azure.cosmos import CosmosClient

client = CosmosClient('https://<cosmos_client>.documents.azure.com:443/', credential='AccountKey')
DATABASE_NAME = 'TestDB'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'Test'
container = database.get_container_client(CONTAINER_NAME)

import json
for item in container.query_items(
         query='SELECT Top 10 * FROM Test',
        enable_cross_partition_query=True):
    data = json.dumps(item, indent=True)
    print(data)
    print(type(data))

# converting string to json dict
data1 = json.loads(data)
print(data1)
print(type(data1))


from pyspark.sql import *
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.getOrCreate()

df = spark.read.json(data1) -- I am getting error on this line.
display(df)

I am getting this error:

"IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: {"

Solution

  • You need to properly parse your JSON data

    data = """{
     "appUuid": "aaaa-bbbb-cccc",
     "SystemId": null,
     "city": "Lancaster",
     "state": "NY",
     "zipCode": "140",
     "field1": "others",
     "field2": "others"
    }
    {
     "appUuid": "bbbb-dddd-eeee",
     "SystemId": null,
     "city": "Alden ",
     "state": "NY",
     "zipCode": "140",
     "field1": "others",
     "field2": "others"
    }"""
    
    import re  
    #convert null for parsing
    data = re.findall('[^}]+}',data.replace("null",'"null"'))
    
    #cast to dict type
    import ast
    data = list(map(ast.literal_eval,data))
    
    df = spark.read.json(sc.parallelize(data))
    df.show()
    

    Output:

    +--------+--------------+---------+------+------+-----+-------+
    |SystemId|       appUuid|     city|field1|field2|state|zipCode|
    +--------+--------------+---------+------+------+-----+-------+
    |    null|aaaa-bbbb-cccc|Lancaster|others|others|   NY|    140|
    |    null|bbbb-dddd-eeee|   Alden |others|others|   NY|    140|
    +--------+--------------+---------+------+------+-----+-------+