I have two cassandra clusters. How do I set both hosts, passwords and users in the same SparkSession? And how do I use them with a CassandraConnector?
I tried this:
val cassandraCon: CassandraConnector = CassandraConnector(conf)
val ks = "monitore"
val ttableName = "validate_structure"
def getIndex(): ResultSet = {
val table = ks + "." + ttableName
val query = s"""select *
|from ${table}""".stripMargin
println(query)
cassandraCon.withSessionDo(s => {
s.execute(query)
})
}
However, the problem is that this only works if the cassandra cluster is on the same host as the spark. I also tried to create a catalog, but I couldn't find a way to make the request with session.execute instead of spark.sql.
Can someone help me? I use Spark Structured Streaming 3.1.2 and use the cassandra connection to enrich my data.
The first problem is that the .withSessionDo
is running in the context of the driver only, and not in the context of the executors, so it won't be distributed.
You need to use:
.joinWithCassandraTable
function from the RDD API (there is also the left join version of it)I have detailed blog post on how to perform efficient joins with data in Cassandra.
Regarding the two clusters - it's completely possible to specify connection details for individual read/write operations, just specify .option("spark.cassandra.connection.host", "host-or-ip")
. (Russell Spitzer, primary developer of SCC, had blog posts on how to connect to multiple clusters). It also possible to do when you're using Catalog API - just append connection property name, like, spark.cassandra.connection.host
to the specific catalog name (see docs)