I am attempting to write my Streaming Dataset into Cassandra.
I have the following streaming dataset of class;
case class UserSession(var id: Int,
var visited: List[String]
)
I also have the following keyspace/table in Cassandra. (Blog=KeySpace, session=Table
CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
CREATE TABLE blog.session(id int PRIMARY KEY, visited list<text>);
I chose list<text>
for visited because my visited is type List<String>
My foreach writer is as the following
class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {
/*
- on every batch, on every partition `partitionId`
- on every "epoch" = chunk of data
- call the open method; if false, skip this chunk
- for each entry in this chunk, call the process method
- call the close method either at the end of the chunk or with an error if it was thrown
*/
val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)
override def open(partitionId: Long, epochId: Long): Boolean = {
println("Open connection")
true
}
override def process(sess: UserSession): Unit = {
connector.withSessionDo { session =>
session.execute(
s"""
|insert into $keyspace.$table("id")
|values (${sess.id},${sess.visited})
""".stripMargin)
}
}
override def close(errorOrNull: Throwable): Unit = println("Closing connection")
}
It may help to look at my process function since this may be throwing an error. My main is the following.
finishedUserSessionsStream: DataSet[UserSession]
def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....
finishedUserSessionsStream.writeStream
.option("checkpointLocation", "checkpoint")
.foreach(new SessionCassandraForeachWriter)
.start()
.awaitTermination()
}
This gives me the following error
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)
For Spark 3.0 & Spark Cassandra Connector 3.0.0 you shouldn't use foreach
- it was a workaround for SCC < 2.5.0 that didn't have native support for writing streaming datasets. Starting with SCC 2.5.0, you can just directly write data to Cassandra, like this (here is full example):
val query = streamingCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "checkpoint")
.option("keyspace", "ks")
.option("table", "table")
.start()
You also need to switch to use of SCC 3.0.0-beta that contains a lot of fixes.