Search code examples
javajsonapache-sparkapache-spark-dataset

Pass-through heterogeneous (non-uniform) JSON column in Spark


We are processing data with Apache Spark 3.1.x where one field contains fully free-form JSON, so the individual records can contain same keys, but with different datatypes (payload.field1 in this example can be string, boolean or number):

{"timestamp": "2021-07-30T09:41:51Z", "payload": {"field1": "some text"}}
{"timestamp": "2021-07-30T09:41:52Z", "payload": {"field1": true}}
{"timestamp": "2021-07-30T09:41:53Z", "payload": {"field1": 123}}

Our goal is to keep payload field intact. When we let Spark autodetect the schema:

 Dataset<Row> events = spark.read().json("file:////users/user/input.json");

 // some processing is going on here

 events.write().json("file:///users/user/output.json");

Output is as following (notice payload.field now being a string):

{"payload":{"field1":"some text"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"true"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"123"},"timestamp":"2021-07-30T09:41:52Z"}

Spark's printSchema() output:

root
 |-- payload: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |-- timestamp: string (nullable = true)

The best workaround we came up so far is this:

Dataset<String> eventsAsString = spark.read().text("file:////users/user/input.json").as(Encoders.STRING());

Dataset<Row> events2 = eventsAsString.select( //
        get_json_object(col("value"), "$.timestamp").alias("timestamp"), //
        get_json_object(col("value"), "$.payload").alias("payload") // This will keep payload as string for Spark
);

// Do some processing of events here

// We have to write JSON as string to prevent Spark from encoding payload's field JSON:
events2.withColumn("joined", concat( //
        format_string("{\"timestamp\":\"%s\", ", col("timestamp")), //
        format_string("\"payload\":%s}", col("payload")) //
)).select(col("joined")).write().text("file:///users/user/output.txt"); 
   

Output we get is what we want, datatypes are unchanged:

{"timestamp":"2021-07-30T09:41:51Z", "payload":{"field1":"some text"}}
{"timestamp":"2021-07-30T09:41:52Z", "payload":{"field1":true}}
{"timestamp":"2021-07-30T09:41:53Z", "payload":{"field1":123}}

The solution above works but feels super hacky. Maybe we are missing something obvious here?

Thanks in advance!


Solution

  • For read, we can specific the schema before read. For write, I don't come up with better idea.

        val schema = StructType(Array[StructField](
          StructField("timestamp", DataTypes.StringType, false, Metadata.empty),
          StructField("payload", DataTypes.StringType, false, Metadata.empty)) // force string type
        )
    
        val df: Dataset[Row] = spark.read.schema(schema).json(ds)
    
        df.map(r => "{\"timestamp\": \"%s\", \"payload\": %s".format(r.getString(0), r.getString(1)))
          .write.text("xxx")