Search code examples
jsondataframepysparkazure-cosmosdbazure-databricks

Serialize PySpark DataFrame as JSON array


I've got a DataFrame in Azure Databricks using PySpark. I need to serialize it as JSON into one or more files. Those files will eventually be uploaded to Cosmos so it's vital for the JSON to be well-formed.

I know how to connect directly to Cosmos to serialize the data directly, but I'm required to create the JSON files for upload to Cosmos at a later time.


I can't give data from my actual DataFrame, but the structure is complex. Each row has embedded objects and some of those have their own embedded objects and arrays of objects.

I assume the problem is with how I'm trying to serialize the data, not how I've transformed it. I've created this simple DataFrame, df, which I think will suffice as an example.

+---------+-------------+
|property1|       array1|
+---------+-------------+
|   value1|["a","b","c"]|
|   value2|["x","y","z"]|
+---------+-------------+

I serialize it to Azure Data Lake Storage Gen2 like so.

df.coalesce(1).write.json(outpath, lineSep=",")

The file will contain this JSON. The rows are not elements in an array and the last row has a trailing comma so this JSON will not cooperate with Cosmos.

{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]},

This JSON uploads as expected.

[{"property1":"value1","array1":["a","b","c"]},
{"property1":"value2","array1":["x","y","z"]}]

I've successfully uploaded single JSON objects (i.e. without [] enclosing them) so any solution that writes each DataFrame row to its own JSON file is a potential winner.

I've tried that by repartitioning but there are always files with multiple rows in them.


Solution

  • I came up with two methods.

    The first creates a list of JSON string rows using df.toJSON().collect(), slices the array into batches, then builds a JSON array string.

    def batchWriteDataFrame(dataframe):
      rows = dataframe.toJSON().collect()
      batches = [rows[i * batch_size:(i + 1) * batch_size] for i in range((len(rows) + batch_size - 1) // batch_size)] # slice the rows into batches
      batch_num = 1
    
      for batch in batches:
        dbutils.fs.put(outpath + "batch/" + str(batch_num) + ".json", "[" + ",".join([row for row in batch]) + "]")
        batch_num += 1
    

    The second writes each row to its own file.

    def writeDataFrameRows(dataframe):
      i = 0
      for row in dataframe.toJSON().collect():
        dbutils.fs.put(outpath + "single/" + str(i) + ".json", row)
        i += 1