Search code examples
apache-sparkapache-spark-sqlspark-streaming

Parsing JSON message in Spark from Kafka stream


I have an event stream (format below ) that I need to parse in spark ( java ). I am able to read the stream but haven't been able to find an example to convert the message to java bean.

{
    user_id  : string,
    session_id : string,
    event : string,
    page : string,
    timestamp : timestamp
}

Java Bean

public class Event implements Serializable {

    private String user_id;

    private String session_id;
    private String page;
    private String event;
    private Timestamp timestamp;
}

Code to read the message as String.

Dataset<String> lines = spark
                        .readStream()
                        .format("kafka")
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", topics)
                        .load()
                        .selectExpr("CAST(value AS STRING)")
                        .as(Encoders.STRING());     

Solution

  • I was able to get this to work using following approach.

            FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
                @Override
                public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
                    ObjectMapper mapper = new ObjectMapper();
                    ArrayList<Event> eventList = new ArrayList<>();
                    eventList.add(mapper.readValue(line, Event.class));
                    return eventList.iterator();
                }
            };
    
    
            Dataset<Event> lines = spark
                                    .readStream()
                                    .format("kafka")
                                    .option("kafka.bootstrap.servers", "localhost:9092")
                                    .option("subscribe", topics)
                                    .load()
                                    .selectExpr("CAST(value AS STRING)")
                                    .as(Encoders.STRING())
                                    .flatMap(linesToEvents, Encoders.bean(Event.class));