Search code examples
pythonjsonmongodbpyspark

Dynamically infer schema of JSON data using Pyspark


I have a MongoDB server from which I load the data in a PySpark Dataframe. However, the problem is due to some differences between the systems(I also tried legacy data types) I can't load the data directly from MongoDB. So, I have to provide an external schema. Now, the problem is that one attribute(steps column) has nested attributes, so I can't provide the exact schema for it.

So while loading I am just marking that column as StringType() and later on trying to infer the schema for that column and its nested structure properly.

My steps data looks like the below:

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|steps                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"id": "selfie", "status": 200, "startedAt": "2024-08-01T11:24:43.698Z", "completedAt": "2024-08-01T11:24:43.702Z", "startCount": 0, "cacheHit": false, "data": {"selfiePhotoUrl": ""}, "inner": {"isSelfieFraudError": false}}]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:03:01.233Z", "completedAt": "2024-08-01T11:03:01.296Z", "startCount": 0, "cacheHit": false, "data": {"country": "Botswana", "countryCode": "BW", "region": "Gaborone", "regionCode": "GA", "city": "Gaborone", "zip": "", "latitude": -24.6437, "longitude": 25.9112, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": "web_mobile"}}, {"id": "liveness", "status": 200, "startedAt": "2024-08-01T11:22:29.787Z", "completedAt": "2024-08-01T11:22:30.609Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}]|
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:24:40.251Z", "completedAt": "2024-08-01T11:24:40.285Z", "startCount": 0, "cacheHit": false, "data": {"country": "Mexico", "countryCode": "MX", "region": "Mexico City", "regionCode": "CMX", "city": "Mexico City", "zip": "03020", "latitude": 19.4203, "longitude": -99.1193, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": ""}}]                                                                                                                                                                                                                                                                                                                                                                                                                              |
|[{"id": "liveness", "status": 200, "startedAt": "2024-07-31T20:57:54.206Z", "completedAt": "2024-07-31T20:57:55.762Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}]                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

As you can see the data attribute is of nested type and the attributes are not even fixed.

I tried the below code but it works just for the first record since I am using head()

schema = F.schema_of_json(df.select('steps').head()[0])
df1 = df.select("_id","steps").withColumn("steps",F.from_json("steps", schema))

Now, this works for first record but it matches this schema for other records and even if there are additional attributes, it truncates those attributes and doesn't produce any attribute. Have a look at the data attribute in the output.

Like for example I am getting the output as below:

{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:43.702Z","data":{"selfiePhotoUrl":""},"id":"selfie","inner":{"isSelfieFraudError":false},"startCount":0,"startedAt":"2024-08-01T11:24:43.698Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:03:01.296Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:03:01.233Z","status":200},{"cacheHit":false,"completedAt":"2024-08-01T11:22:30.609Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-08-01T11:22:29.787Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:40.285Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:24:40.251Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-07-31T20:57:55.762Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-07-31T20:57:54.206Z","status":200}]}
{"_id":"","steps":[]}

_id is an additional column that I have written in the output. It should be ignored.

I was trying to sample the records but the method I used just works for only a single record. So how can I infer the schema dynamically? Is there any optimal way?


Solution

  • You can do something like below:

    schema = spark.read.json(df.rdd.map(lambda row: row['steps'])).schema
    array_of_steps_schema = ArrayType(schema, True)
    df1 = df.withColumn('steps', from_json(col('steps'), array_of_steps_schema))  
    

    and it gives me the correct schema and I don't see the data loss either.

    The reason for this approach is I can't really trust sampling in this case because I don't know how it works and whether that specified sample will be unbiased or not.