Search code examples
palantir-foundry

In Foundry, how can I parse a dataframe column that has a JSON response


I am trying to bring in JIRA data into Foundry using an external API. When it comes in via Magritte, the data gets stored in AVRO and there is a column called response. The response column has data that looks like this...


[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]

Due to the fact that this imports as AVRO, the documentation that talks about how to convert this data that's in Foundry doesn't work. How can I convert this data into individual columns and rows?

Here is the code that I've attempted to use:

from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T


@transform_df(
    Output("json output"),
    json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):

    sqlContext = SQLContext(sc)

    source = json_raw.select('response').collect()  # noqa

    # Read the list into data frame
    df = sqlContext.read.json(sc.parallelize(source))

    json_schema = T.StructType([
        T.StructField("id", T.StringType(), False),
        T.StructField("name", T.StringType(), False),
        T.StructField("custom", T.StringType(), False),
        T.StructField("orderable", T.StringType(), False),
        T.StructField("navigable", T.StringType(), False),
        T.StructField("searchable", T.StringType(), False),
        T.StructField("clauseNames", T.StringType(), False),
        T.StructField("schema", T.StringType(), False)
    ])

    udf_parse_json = udf(lambda str: parse_json(str), json_schema)

    df_new = df.select(udf_parse_json(df.response).alias("response"))

    return df_new


# Function to convert JSON array string to a list
def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])

Solution

  • Parsing Json in a string column to a struct column (and then into separate columns) can be easily done using the F.from_json function.

    In your case, you need to do:

    df = df.withColumn("response_parsed", F.from_json("response", json_schema))
    

    Then you can do this or similar to get the contents into different columns:

    df = df.select("response_parsed.*")
    

    However, this won't work as your schema is incorrect, you actually have a list of json structs in each row, not just 1, so you need a T.ArrayType(your_schema) wrapping around the whole thing, you'll also need to do an F.explode before selecting, to get each array element in its own row.

    An additional useful function is F.get_json_object, which allows you to get json one json object from a json string.

    Using a UDF like you've done could work, but UDFs are generally much less performant than native spark functions.

    Additionally, all the AVRO file format does in this case is to merge multiple json files into one big file, with each file in its own row, so the example under "Rest API Plugin" - "Processing JSON in Foundry" should work as long as you skip the 'put this schema on the raw dataset' step.