Search code examples
apache-sparkapache-kafkacassandraspark-structured-streamingspark-cassandra-connector

Spark Streaming Convert Dataset<Row> to Dataset<CustomObject> in java


I've recently started working with apache spark and came across a requirement where I need to read kafka stream and feed the data in cassandra. While doing so I encountered an issue where as streams are SQL based and cassandra connector is on rdd (I may be wrong here please do correct me) I was struggling to get this working. Somehow I made it work as of now but not sure if that's the true way to implement.

Below is the code

Schema

StructType getSchema(){
StructField[] structFields = new StructField[]{
                new StructField("id", DataTypes.LongType, true, Metadata.empty()),
                new StructField("name", DataTypes.StringType, true, Metadata.empty()),
                new StructField("cat", DataTypes.StringType, true, Metadata.empty()),
                new StructField("tag", DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty())
              
        };
        return new StructType(structFields);
}

stream reader

  Dataset<Row> results = kafkaDataset.select(
                col("key").cast("string"),
                from_json(col("value").cast("string"), getSchema()).as("value"),
                col("topic"),
                col("partition"),
                col("offset"),
                col("timestamp"),
                col("timestampType"));

        results.select("value.*")
                .writeStream()
                .foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
                    @Override
                    public void call(Dataset<Row> dataset, Long batchId) throws Exception {
                        ObjectMapper mapper = new ObjectMapper();
                        List<DealFeedSchema> list = new ArrayList<>();
                        List<Row> rowList = dataset.collectAsList();
                        if (!rowList.isEmpty()) {
                            rowList.forEach(row -> {
                                if (row == null) logger.info("Null DataSet");
                                else {
                                    try {
                                        list.add(mapper.readValue(row.json(), DealFeedSchema.class));
                                    } catch (JsonProcessingException e) {
                                        logger.error("error parsing Data", e);
                                    }
                                }
                            });
                            JavaRDD<DealFeedSchema> rdd = new JavaSparkContext(session.sparkContext()).parallelize(list);
                            javaFunctions(rdd).writerBuilder(Constants.CASSANDRA_KEY_SPACE,
                                    Constants.CASSANDRA_DEAL_TABLE_SPACE, mapToRow(DealFeedSchema.class)).saveToCassandra();
                        }
                    }

                }).
                start().awaitTermination();

although this works fine i need to know if theres a better way to do this if there is any please let me know how to acheive it.

Thanks in advance. for those who are looking for a way you can refer this code as an alternative.. :)


Solution

  • To Convert Dataset< Row > to Dataset< DealFeedSchema > in java

    1. Java Bean for DealFeedSchema

    
    import java.util.List;
    
    public class DealFeedSchema {
        private long id;
        private String name;
        private String cat;
        private List<String> tag;
    
    
        public long getId() {
            return id;
        }
    
        public void setId(long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getCat() {
            return cat;
        }
    
        public void setCat(String cat) {
            this.cat = cat;
        }
    
        public List<String> getTag() {
            return tag;
        }
    
        public void setTag(List<String> tag) {
            this.tag = tag;
        }
    }
    

    2. Load the test data

     Dataset<Row> dataFrame = spark.createDataFrame(Arrays.asList(
                    RowFactory.create(1L, "foo", "cat1", Arrays.asList("tag1", "tag2"))
            ), getSchema());
            dataFrame.show(false);
            dataFrame.printSchema();
            /**
             * +---+----+----+------------+
             * |id |name|cat |tag         |
             * +---+----+----+------------+
             * |1  |foo |cat1|[tag1, tag2]|
             * +---+----+----+------------+
             *
             * root
             *  |-- id: long (nullable = true)
             *  |-- name: string (nullable = true)
             *  |-- cat: string (nullable = true)
             *  |-- tag: array (nullable = true)
             *  |    |-- element: string (containsNull = true)
             */
    

    3. Convert Dataset<Row> to Dataset<DealFeedSchema>

            Dataset<DealFeedSchema> dealFeedSchemaDataset = dataFrame.as(Encoders.bean(DealFeedSchema.class));
            dealFeedSchemaDataset.show(false);
            dealFeedSchemaDataset.printSchema();
            /**
             * +---+----+----+------------+
             * |id |name|cat |tag         |
             * +---+----+----+------------+
             * |1  |foo |cat1|[tag1, tag2]|
             * +---+----+----+------------+
             *
             * root
             *  |-- id: long (nullable = true)
             *  |-- name: string (nullable = true)
             *  |-- cat: string (nullable = true)
             *  |-- tag: array (nullable = true)
             *  |    |-- element: string (containsNull = true)
             */