I've got a DataFrame in Azure Databricks using PySpark. I need to serialize it as JSON into one or more files. Those files will eventually be uploaded to Cosmos so it's vital for the JSON to be well-formed.
I know how to connect directly to Cosmos to serialize the data directly, but I'm required to create the JSON files for upload to Cosmos at a later time.
I can't give data from my actual DataFrame, but the structure is complex. Each row has embedded objects and some of those have their own embedded objects and arrays of objects.
I assume the problem is with how I'm trying to serialize the data, not how I've transformed it. I've created this simple DataFrame, df
, which I think will suffice as an example.
+---------+-------------+
|property1| array1|
+---------+-------------+
| value1|["a","b","c"]|
| value2|["x","y","z"]|
+---------+-------------+
I serialize it to Azure Data Lake Storage Gen2 like so.
df.coalesce(1).write.json(outpath, lineSep=",")
The file will contain this JSON. The rows are not elements in an array and the last row has a trailing comma so this JSON will not cooperate with Cosmos.
{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]},
This JSON uploads as expected.
[{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]}]
I've successfully uploaded single JSON objects (i.e. without []
enclosing them) so any solution that writes each DataFrame row to its own JSON file is a potential winner.
I've tried that by repartitioning but there are always files with multiple rows in them.
I came up with two methods.
The first creates a list of JSON string rows using df.toJSON().collect()
, slices the array into batches, then builds a JSON array string.
def batchWriteDataFrame(dataframe):
rows = dataframe.toJSON().collect()
batches = [rows[i * batch_size:(i + 1) * batch_size] for i in range((len(rows) + batch_size - 1) // batch_size)] # slice the rows into batches
batch_num = 1
for batch in batches:
dbutils.fs.put(outpath + "batch/" + str(batch_num) + ".json", "[" + ",".join([row for row in batch]) + "]")
batch_num += 1
The second writes each row to its own file.
def writeDataFrameRows(dataframe):
i = 0
for row in dataframe.toJSON().collect():
dbutils.fs.put(outpath + "single/" + str(i) + ".json", row)
i += 1