I have a MongoDB
server from which I load the data in a PySpark
Dataframe. However, the problem is due to some differences between the systems(I also tried legacy data types) I can't load the data directly from MongoDB
. So, I have to provide an external schema. Now, the problem is that one attribute(steps
column) has nested attributes, so I can't provide the exact schema for it.
So while loading I am just marking that column as StringType()
and later on trying to infer the schema for that column and its nested structure properly.
My steps data looks like the below:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|steps |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"id": "selfie", "status": 200, "startedAt": "2024-08-01T11:24:43.698Z", "completedAt": "2024-08-01T11:24:43.702Z", "startCount": 0, "cacheHit": false, "data": {"selfiePhotoUrl": ""}, "inner": {"isSelfieFraudError": false}}] |
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:03:01.233Z", "completedAt": "2024-08-01T11:03:01.296Z", "startCount": 0, "cacheHit": false, "data": {"country": "Botswana", "countryCode": "BW", "region": "Gaborone", "regionCode": "GA", "city": "Gaborone", "zip": "", "latitude": -24.6437, "longitude": 25.9112, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": "web_mobile"}}, {"id": "liveness", "status": 200, "startedAt": "2024-08-01T11:22:29.787Z", "completedAt": "2024-08-01T11:22:30.609Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}]|
|[{"id": "ip-validation", "status": 200, "startedAt": "2024-08-01T11:24:40.251Z", "completedAt": "2024-08-01T11:24:40.285Z", "startCount": 0, "cacheHit": false, "data": {"country": "Mexico", "countryCode": "MX", "region": "Mexico City", "regionCode": "CMX", "city": "Mexico City", "zip": "03020", "latitude": 19.4203, "longitude": -99.1193, "safe": true, "ipRestrictionEnabled": false, "vpnDetectionEnabled": false, "platform": ""}}] |
|[{"id": "liveness", "status": 200, "startedAt": "2024-07-31T20:57:54.206Z", "completedAt": "2024-07-31T20:57:55.762Z", "startCount": 1, "cacheHit": false, "data": {"videoUrl": "", "spriteUrl": "", "selfieUrl": {"media": "", "isUrl": true}}, "inner": {}}] |
|[] |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
As you can see the data
attribute is of nested type and the attributes are not even fixed.
I tried the below code but it works just for the first record since I am using head()
schema = F.schema_of_json(df.select('steps').head()[0])
df1 = df.select("_id","steps").withColumn("steps",F.from_json("steps", schema))
Now, this works for first record but it matches this schema for other records and even if there are additional attributes, it truncates those attributes and doesn't produce any attribute. Have a look at the data
attribute in the output.
Like for example I am getting the output as below:
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:43.702Z","data":{"selfiePhotoUrl":""},"id":"selfie","inner":{"isSelfieFraudError":false},"startCount":0,"startedAt":"2024-08-01T11:24:43.698Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:03:01.296Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:03:01.233Z","status":200},{"cacheHit":false,"completedAt":"2024-08-01T11:22:30.609Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-08-01T11:22:29.787Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-08-01T11:24:40.285Z","data":{},"id":"ip-validation","startCount":0,"startedAt":"2024-08-01T11:24:40.251Z","status":200}]}
{"_id":"","steps":[{"cacheHit":false,"completedAt":"2024-07-31T20:57:55.762Z","data":{},"id":"liveness","inner":{},"startCount":1,"startedAt":"2024-07-31T20:57:54.206Z","status":200}]}
{"_id":"","steps":[]}
_id
is an additional column that I have written in the output. It should be ignored.
I was trying to sample the records but the method I used just works for only a single record. So how can I infer the schema dynamically? Is there any optimal way?
You can do something like below:
schema = spark.read.json(df.rdd.map(lambda row: row['steps'])).schema
array_of_steps_schema = ArrayType(schema, True)
df1 = df.withColumn('steps', from_json(col('steps'), array_of_steps_schema))
and it gives me the correct schema and I don't see the data loss either.
The reason for this approach is I can't really trust sampling
in this case because I don't know how it works and whether that specified sample will be unbiased or not.