Search code examples
jsonparquetazure-databricks

How do I convert a column of JSON strings into a parquet table


I am trying to convert some data that I am receiving into a parquet table that I can eventually use for reporting, but feel like I am missing a step.

I receive files that are CSVs where the format is "id", "event", "source" where the "event" column is a GZIP compressed JSON string. I've been able to get a dataframe set up that extracts the three columns, including getting the JSON string unzipped. So I have a table now that has

id | event | source | unencoded_event

Where the unencoded_event is the JSON string.

What I'd like to do at this point is to take that one string column of JSON and parse it out into individual columns. Based on a comment from another developer (that the process of converting to parquet is smart enough to just use the first row of my results to figure out schema), I've tried this:

df1 = spark.read.json(df.select("unencoded_event").rdd).write.format("parquet").saveAsTable("test")

But this just gives me a single column table with a column of _corrupt_record that just has the JSON string again.

What I'm trying to get to is to take schema:

{
  "agent"
  --"name"
  --"organization"
  "entity"
  --"name"
  ----"type"
  ----"value"
}

And get the table to, ultimately, look like: AgentName | Organization | EventType | EventValue

Is the step I'm missing just explicitly defining the schema or have I oversimplified my approach?

Potential complications here: the JSON schema is actually more involved than above; I've been assuming I can expand out the full schema into a wider table and then just return the smaller set I care about.

I have also tried taking a single result from the file (so, a single JSON string), saving it as a JSON file and trying to read from it. Doing so works, i.e., doing the spark.read.json(myJSON.json) parses the string into the arrays I was expecting. This is also true if I copy multiple strings.

This doesn't work if I take my original results and try to save them. If I try to save just the column of strings as a json file

dfWrite = df.select(col("unencoded_event"))
dfWrite.write.mode("overwrite").json(write_location)

then read them back out, this doesn't behave the same way...each row is still treated as strings.


Solution

  • I did find one solution that works. This is not a perfect solution (I'm worried that it's not scalable), but it gets me to where I need to be.

    I can select the data using get_json_object() for each column I want (sorry, I've been fiddling with column names and the like over the course of the day):

    dfResults = df.select(get_json_object("unencoded_event", "$.agent[0].name").alias("userID"), 
    get_json_object("unencoded_event", "$.entity[0].identifier.value").alias("itemID"),
    get_json_object("unencoded_event", "$.entity[0].detail[1].value").alias("itemInfo"),
    get_json_object("unencoded_event", "$.recorded").alias("timeStamp"))
    

    The big thing I don't love about this is that it appears I can't use filter/search options with get_json_object(). That's fine for the forseeable future, because right now I know where all the data should be and don't need to filter.

    I believe I can also use from_json() but that requires defining the schema within the notebook. This isn't a great option because I only need a small part of the JSON, so it feels like unnecessary effort to define the entire schema. (I also don't have control over what the overall schema would be, so this becomes a maintenance issue.)