Search code examples
apache-sparkcassandraspark-structured-streamingspark-cassandra-connector

How can I get two different cassandra clusters in my spark structured streaming?


I have two cassandra clusters. How do I set both hosts, passwords and users in the same SparkSession? And how do I use them with a CassandraConnector?

I tried this:

val cassandraCon: CassandraConnector = CassandraConnector(conf)

  val ks = "monitore"
  val ttableName = "validate_structure"

  def getIndex(): ResultSet = {

    val table = ks + "." + ttableName
    val query = s"""select *
                   |from ${table}""".stripMargin
    println(query)
    cassandraCon.withSessionDo(s => {
      s.execute(query)
    })
  }

However, the problem is that this only works if the cassandra cluster is on the same host as the spark. I also tried to create a catalog, but I couldn't find a way to make the request with session.execute instead of spark.sql.

Can someone help me? I use Spark Structured Streaming 3.1.2 and use the cassandra connection to enrich my data.


Solution

  • The first problem is that the .withSessionDo is running in the context of the driver only, and not in the context of the executors, so it won't be distributed.

    You need to use:

    • either .joinWithCassandraTable function from the RDD API (there is also the left join version of it)
    • or use so called DirectJoin (see details in the blog post of its author), when Spark Cassandra Connector (SCC) detects that one side of the join is in Cassandra, and convert it into queries to individual partitions. Unfortunately, Spark 3.1 broke the current version of DirectJoin in SCC (see JIRA), so you may need to use the RDD API until it's fixed.

    I have detailed blog post on how to perform efficient joins with data in Cassandra.

    Regarding the two clusters - it's completely possible to specify connection details for individual read/write operations, just specify .option("spark.cassandra.connection.host", "host-or-ip"). (Russell Spitzer, primary developer of SCC, had blog posts on how to connect to multiple clusters). It also possible to do when you're using Catalog API - just append connection property name, like, spark.cassandra.connection.host to the specific catalog name (see docs)