How to read stream nested JSON from kafka in Spark using Java

I'm trying to read complex nested JSON data from kafka in spark using Java and having trouble in forming the Dataset

Actual JSON file sent to kafka

{"sample_title": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}
{"sample_title2": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}
{"sample_title3": {"txn_date": "2019-01-10","timestamp": "2019-02-01T08:57:18.100Z","txn_type": "TBD","txn_rcvd_time": "01/04/2019 03:32:32.135","txn_ref": "Test","txn_status": "TEST"}}
Dataset<Row> df = spark.readStream().format("kafka")
                    .option("spark.local.dir", config.getString(PropertyKeys.SPARK_APPLICATION_TEMP_LOCATION.getCode()))
                    .option("subscribe", config.getString(PropertyKeys.KAFKA_TOPIC_IPE_STP.getCode()))
                    .option("startingOffsets", "earliest")
                    .option("", config.getString(PropertyKeys.SECURITY_PROTOCOL.getCode()))
                    .option("kafka.ssl.key.password", config.getString(PropertyKeys.SSL_KEY_PASSWORD.getCode())).load()
                    .selectExpr("CAST(key AS STRING)",
                            "CAST(value AS STRING)",
                            "topic as topic",
                            "partition as partition","offset as offset",
                            "timestamp as timestamp",
                            "timestampType as timestampType");

val output =  df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING()).filter(x -> x.contains("sample_title"));

As I can have multiple schema in the input , the code should be able to handle that and filter according to the title and map to Dataset of type Title

public class Title implements Serializable {
    String txn_date;
    Timestamp timestamp;
    String txn_type;
    String txn_rcvd_time;
    String txn_ref;
    String txn_status;


  • First make class Title a java bean class i.e, write getters and setter.

        public class Title implements Serializable {
            String txn_date;
            Timestamp timestamp;
            String txn_type;
            String txn_rcvd_time;
            String txn_ref;
            String txn_status;
            public Title(String data){... //set values for fields with the data}
            // add all getters and setters for fields
        Dataset<Title> resultdf = df.selectExpr("CAST(value AS STRING)").map(value -> new Title(value), Encoders.bean(Title.class))
    resultdf.filter(title -> // apply any predicate on title)

    if you want to filter the data first and then apply encoding,

        df.selectExpr("CAST(value AS STRING)")
    .filter(get_json_object(col("value"), "$.sample_title").isNotNull)
    // for simple filter use, .filter(t-> t.contains("sample_title"))
    .map(value -> new Title(value), Encoders.bean(Title.class))