I have ingested xml file using KafkaConnect file-pulse connector 1.5.3 Then I want to read it with Spark Streaming to parse/flatten it. As it is quite nested.
the string I read out of the kafka (I used the consumer console read this out, and put an Enter/new line before the payload
for illustration) is like below:
datatype I attempted:
StructType schema = new StructType();
schema = schema.add( "schema", StringType, false);
schema = schema.add( "payload", StringType, false);
StructType Data = new StructType();
StructType ValueArray = new StructType(new StructField[]{
new StructField("unit", StringType,true,Metadata.empty()),
new StructField("value", StringType,true,Metadata.empty())
StructType ForcedArrayType = new StructType(new StructField[]{
new StructField("valueArray", ValueArray,true,Metadata.empty())
Data = Data.add("city",StringType,true);
Data = Data.add("forcedArrayField",ForcedArrayType,true);
Data = Data.add("lastField",StringType,true);
StructType Record = new StructType();
Record = Record.add("data", Data, false);
query I attempted:
//below worked for payload
Dataset<Row> parsePayload = lines
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema=schema).as("schemaAndPayload"))
//below makes the output empty:
Dataset<Row> parseValue = parsePayload.select(functions.from_json(functions.col("payload"), Record).as("cols"))
StreamingQuery query = parseValue
when I oupput the parsePayload stream, i could see the data(still json struture), but when i want to select certain/all field like above city. it is empty.
help needed Is the cause data type defined wrong? or the query is wrong?
Ps. at the console, when i tried to output the 'parsePayload', instead of 'parseValue', it displays some data, which made me think the 'payload' part worked.
Your schema definition seems to be not fully correct. I was replicating your problem and was able to parse the JSON with the following schema
val payloadSchema = new StructType()
.add("data", new StructType()
.add("city", StringType)
.add("forcedArrayField", ArrayType(new StructType()
.add("value", ArrayType(new StructType()
.add("unit", StringType)
.add("value", StringType)))))
.add("lastField", StringType))
When I then access individual fields I used the following selection:
val parsePayload = df
.selectExpr("cast (value as string) as json")
.select(functions.from_json(functions.col("json"), schema).as("schemaAndPayload"))
.select(functions.from_json(functions.col("payload"), payloadSchema).as("cols"))
.select(col("cols.data.city").as("city"), explode(col("cols.data.forcedArrayField")).as("forcedArrayField"), col("cols.data.lastField").as("lastField"))
.select(col("city"), explode(col("forcedArrayField.value").as("middleFields")), col("lastField"))
This gives the output
| city| col| lastField|
|someCity|[unitField1, 123]|2020-08-02T18:02:00|
|someCity|[unitField1, 456]|2020-08-02T18:02:00|