Search code examples
apache-sparkcassandraspark-structured-streamingspark-cassandra-connector

Spark Streaming Dataset Cassandra Connection UnsupportedOperationChecker


I am attempting to write my Streaming Dataset into Cassandra.

I have the following streaming dataset of class;

case class UserSession(var id: Int,
                       var visited: List[String]
                      )

I also have the following keyspace/table in Cassandra. (Blog=KeySpace, session=Table

CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy',    'replication_factor' : 1 };


CREATE TABLE blog.session(id int PRIMARY KEY, visited list<text>);

I chose list<text> for visited because my visited is type List<String>

My foreach writer is as the following

class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {

/*
  - on every batch, on every partition `partitionId`
    - on every "epoch" = chunk of data
      - call the open method; if false, skip this chunk
      - for each entry in this chunk, call the process method
      - call the close method either at the end of the chunk or with an error if it was thrown
 */

val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)

override def open(partitionId: Long, epochId: Long): Boolean = {
  println("Open connection")
  true
}

override def process(sess: UserSession): Unit = {
  connector.withSessionDo { session =>
    session.execute(
      s"""
         |insert into $keyspace.$table("id")
         |values (${sess.id},${sess.visited})
       """.stripMargin)
  }
}

override def close(errorOrNull: Throwable): Unit = println("Closing connection")

 }

It may help to look at my process function since this may be throwing an error. My main is the following.

finishedUserSessionsStream: DataSet[UserSession]

def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....

finishedUserSessionsStream.writeStream
      .option("checkpointLocation", "checkpoint")
      .foreach(new SessionCassandraForeachWriter)
      .start()
      .awaitTermination()

}

This gives me the following error

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)


Solution

  • For Spark 3.0 & Spark Cassandra Connector 3.0.0 you shouldn't use foreach - it was a workaround for SCC < 2.5.0 that didn't have native support for writing streaming datasets. Starting with SCC 2.5.0, you can just directly write data to Cassandra, like this (here is full example):

         val query = streamingCountsDF.writeStream
          .outputMode(OutputMode.Update)
          .format("org.apache.spark.sql.cassandra")
          .option("checkpointLocation", "checkpoint")
          .option("keyspace", "ks")
          .option("table", "table")
          .start()
    

    You also need to switch to use of SCC 3.0.0-beta that contains a lot of fixes.