Search code examples
apache-sparkapache-kafkaspark-structured-streamingspark-streaming-kafka

Spark Structured Streaming to read nested Kafka Connect jsonConverter message


I have ingested xml file using KafkaConnect file-pulse connector 1.5.3 Then I want to read it with Spark Streaming to parse/flatten it. As it is quite nested.

the string I read out of the kafka (I used the consumer console read this out, and put an Enter/new line before the payload for illustration) is like below:

{
"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"city"},{"type":"array","items":{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"unit"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"Value"},"optional":true,"field":"value"}],"optional":true,"name":"ForcedArrayType"},"optional":true,"field":"forcedArrayField"},{"type":"string","optional":true,"field":"lastField"}],"optional":true,"name":"Data","field":"data"}],"optional":true}

,"payload":{"data":{"city":"someCity","forcedArrayField":[{"value":[{"unit":"unitField1","value":"123"},{"unit":"unitField1","value":"456"}]}],"lastField":"2020-08-02T18:02:00"}}
}

datatype I attempted:

    StructType schema = new StructType();
    schema = schema.add( "schema", StringType, false);
    schema = schema.add( "payload", StringType, false);

    StructType Data = new StructType();
    StructType ValueArray = new StructType(new StructField[]{
            new StructField("unit", StringType,true,Metadata.empty()),
            new StructField("value", StringType,true,Metadata.empty())
    });
    StructType ForcedArrayType = new StructType(new StructField[]{
            new StructField("valueArray", ValueArray,true,Metadata.empty())
    });

    Data = Data.add("city",StringType,true);
    Data = Data.add("forcedArrayField",ForcedArrayType,true);
    Data = Data.add("lastField",StringType,true);

    StructType Record = new StructType();
    Record = Record.add("data", Data, false);

query I attempted:

        //below worked for payload
        Dataset<Row> parsePayload = lines
                .selectExpr("cast (value as string) as json")
                .select(functions.from_json(functions.col("json"), schema=schema).as("schemaAndPayload"))
                .select("schemaAndPayload.payload").as("payload");

        System.out.println(parsePayload.isStreaming());

        //below makes the output empty:
        Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"), Record).as("cols"))
                .select(functions.col("cols.data.city"));
//.select(functions.col("cols.*"));

        StreamingQuery query = parseValue
                .writeStream()
                .format("console")
                .outputMode(OutputMode.Append())
                .start();
        query.awaitTermination();

when I oupput the parsePayload stream, i could see the data(still json struture), but when i want to select certain/all field like above city. it is empty.

help needed Is the cause data type defined wrong? or the query is wrong?

Ps. at the console, when i tried to output the 'parsePayload', instead of 'parseValue', it displays some data, which made me think the 'payload' part worked.

 |{"data":{"city":"...|
...


Solution

  • Your schema definition seems to be not fully correct. I was replicating your problem and was able to parse the JSON with the following schema

    val payloadSchema = new StructType()
      .add("data", new StructType()
        .add("city", StringType)
        .add("forcedArrayField", ArrayType(new StructType()
          .add("value", ArrayType(new StructType()
            .add("unit", StringType)
            .add("value", StringType)))))
        .add("lastField", StringType))
    

    When I then access individual fields I used the following selection:

    val parsePayload = df
        .selectExpr("cast (value as string) as json")
        .select(functions.from_json(functions.col("json"), schema).as("schemaAndPayload"))
        .select("schemaAndPayload.payload").as("payload")
        .select(functions.from_json(functions.col("payload"), payloadSchema).as("cols"))
        .select(col("cols.data.city").as("city"), explode(col("cols.data.forcedArrayField")).as("forcedArrayField"), col("cols.data.lastField").as("lastField"))
        .select(col("city"), explode(col("forcedArrayField.value").as("middleFields")), col("lastField"))
    

    This gives the output

    +--------+-----------------+-------------------+
    |    city|              col|          lastField|
    +--------+-----------------+-------------------+
    |someCity|[unitField1, 123]|2020-08-02T18:02:00|
    |someCity|[unitField1, 456]|2020-08-02T18:02:00|
    +--------+-----------------+-------------------+