Search code examples
apache-sparkcassandraapache-spark-sqlspark-structured-streamingspark-cassandra-connector

I don't understand why there isn't any save or append data operation at the last stage


      df.writeStream
      .foreachBatch((batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .format("org.apache.spark.sql.cassandra")
          .cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
          .mode("append")
          .save())
      .option("checkpointLocation", checkpointDir)
      .start()
      .awaitTermination()

At the end of the code, dataframe is written to cassandra table.

After checking the last stage through the UI, there was no part to execute save/append-data.

enter image description here

I wonder why it doesn't exist or is there something I've missed.

=========================== After changing my code ===========================

      .writeStream
//      .foreachBatch((batchDF: DataFrame, batchId: Long) =>
//        batchDF.write
//          .format("org.apache.spark.sql.cassandra")
//          .cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
//          .mode("append")
//          .save())
      .cassandraFormat(cassandraTable, cassandraKeyspace, cassandraCluster)
      .option("checkpointLocation", checkpointDir)
      .start()
      .awaitTermination()

enter image description here

But I can see WriteToDataSourceV2 in SQL tab.

enter image description here


Solution

  • Maybe it’s not directly answering your question, but for Spark 3.0 and SCC 3.0.0 (you should use 3.0.0-beta) you shouldn’t use foreachBatch, but just write data as-is by specifying Cassandra format - since SCC 2.5.0 Spark Structured Streaming is natively supported - see announcement: https://www.datastax.com/blog/2020/05/advanced-apache-cassandra-analytics-now-open-all