Search code examples
javaapache-sparkcassandraspark-structured-streamingspark-cassandra-connector

How to write to Cassandra using foreachBatch() in Java Spark?


I have the following code and i would like to write into cassandra using spark 2.4 structured streaming foreachBatch

        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic1")
                .load();

        Dataset<Row> values=df.selectExpr(
                "split(value,',')[0] as field1",
                "split(value,',')[1] as field2",
                "split(value,',')[2] as field3",
                "split(value,',')[3] as field4",
                "split(value,',')[4] as field5");

//TODO write into cassandra 

values.writeStream().foreachBatch(
                    new VoidFunction2<Dataset<String>, Long> {
                public void call(Dataset<String> dataset, Long batchId) {

                    // Transform and write batchDF
       
            }
            ).start();


Solution

  • When you use .forEachBatch, your code is just working as with normal datasets... In Java the code could look like as following (full source is here):

    .foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) ->
             df.write()
             .format("org.apache.spark.sql.cassandra")
             .options(ImmutableMap.of("table", "sttest", "keyspace", "test"))
             .mode(SaveMode.Append)
             .save()
    )
    

    Update in September 2020th: support for spark structured streaming was added in the Spark Cassandra Connector 2.5.0