In my Spark application, I am trying to read the incoming JSON data, sent through the socket. The data is in string format. eg. {"deviceId": "1", "temperature":4.5}
.
I created a schema as shown below:
StructType dataSchema = new StructType()
.add("deviceId", "string")
.add("temperature", "double");
I wrote the below code to extract the fields, turn them into a column, so I can use that in SQL queries.
Dataset<Row> normalizedStream = stream.select(functions.from_json(new Column("value"),dataSchema)).as("json");
Dataset<Data> test = normalizedStream.select("json.*").as(Encoders.bean(Data.class));
test.printSchema();
Data.class
public class Data {
private String deviceId;
private double temperature;
}
But when I submit the Spark app, the output schema is as below.
root
|-- from_json(value): struct (nullable = true)
| |-- deviceId: string (nullable = true)
| |-- temperature: double (nullable = true)
the from_json
function is coming as a column name.
What I expect is:
root
|-- deviceId: string (nullable = true)
|-- temperature: double (nullable = true)
How to fix the above? Please, let me know what I doing wrong.
The problem is the placement of alias
. Right now, you are placing an alias to select
, and not to from_json
where it is supposed to be.
Right now, json.*
does not work because the renaming is not working as intended, therefore no column called json
can be found, nor any children inside of it.
So, if you move the brackets from this:
...(new Column("value"),dataSchema)).as("json");
to this:
...(new Column("value"),dataSchema).as("json"));
your final data and schema will look as:
+--------+-----------+
|deviceId|temperature|
+--------+-----------+
|1 |4.5 |
+--------+-----------+
root
|-- deviceId: string (nullable = true)
|-- temperature: double (nullable = true)
which is what you intend to do. Hope this helps, good luck!