Search code examples
pysparkspark-streamingazure-databricks

Access specific element in array in a string of JSON format


I have some streaming data, that can minimally be reduced like so:

{
    "data":[
        {
            "key":1,
            "val":"a"
        },
        {
            "key":2,
            "val":"b",
            "test":"bla"
        }
    ]
}

from which I need to access the "data" array which is a string of JSON format. And more specifically I need to find the "val" field in the JSON in it where "key"==2.

So far I have tried:

  • I know that I can access it like this:

    F.get_json_object(...,"$.data[1].val")
    

    but then if the JSON changes the order of objects in the data array, it will no longer work.

  • For JSON I could use:

    F.get_json_object(...,"$.data[?(@.key==2)].val")
    

    but this does not seem to work on Databricks.

  • I tried to dynamically create a struct from JSON string. But "Queries with streaming sources must be executed with writeStream.start()". But I do not want to write the stream anywhere jet since I am still at the preprocessing. Or how could I maybe work around this?

  • I tried to only define the Struct for the array as shown here, but since the elements in the array have varying structure, this does not work.

  • I tried to write a user defined function to access the data object and containing a JSON string which I would then parse like so:

    
    def parse_json(id,idName,keyName,jsonString):
      from json import loads
      data=loads(jsonString)
      res=[d[keyName] for d in data if d[idName]==id]
      return res[0]
    

    and tried to call it with jsonString=F.col("data") where "data" holds the string. But this gives me errors, saying it does not find the attribute I put into the id field.


Solution

  • One approach is to convert the stringified JSON to array of struct type then get the value you want.

    Even though structure varies, if you have some schema that is somewhat stable, you can build a schema that includes what you want to decompose.

    For example, below schema will parse test field if exists, and you will get NULL when it doesn't exist (when key = 1). Also, if you are not interested in test field, you can omit the StructField and test is ignored.

    schema = StructType([
        StructField('data', ArrayType(StructType([
            StructField("key", IntegerType()),
            StructField("val", StringType()),
            StructField("test", StringType()),
            # add more field that you are interested in
        ])))
    ])
    

    Use this schema in from_json then extract the field you want.

    df = (df.withColumn('data', F.from_json('data', schema))
          .withColumn('data', F.filter(F.col('data').data, lambda x: x.key == 2)[0].val))
    

    If there are no key = 2, you will get NULL without getting a hard crash.