I have an event stream (format below ) that I need to parse in spark ( java ). I am able to read the stream but haven't been able to find an example to convert the message to java bean.
{
user_id : string,
session_id : string,
event : string,
page : string,
timestamp : timestamp
}
Java Bean
public class Event implements Serializable {
private String user_id;
private String session_id;
private String page;
private String event;
private Timestamp timestamp;
}
Code to read the message as String.
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
I was able to get this to work using following approach.
FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
@Override
public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
ArrayList<Event> eventList = new ArrayList<>();
eventList.add(mapper.readValue(line, Event.class));
return eventList.iterator();
}
};
Dataset<Event> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.flatMap(linesToEvents, Encoders.bean(Event.class));