Search code examples
azureazure-cosmosdbazure-databricksspark-cassandra-connectorazure-cosmosdb-cassandra-api

Abstract Method Exception when connecting to Azure Cosmos DB Cassandra API table using Azure databricks job


I am trying to write data to Cassandra table (cosmos DB) via Azure DBR job (spark streaming). Getting below exception:

Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

What I did to get here:

  • created cosmos DB account
  • created cassandra keyspace
  • created cassandra table
  • created DBR job
  • added com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 to the job cluster
  • added com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 to the job cluster

What I tried:

different versions of connectors or azure cosmos db helper libraries, but some or the other ClassNotFoundExceptions or MethodNotFound errors

Code Snippet:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime

def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
    spark.conf.set("spark.cassandra.output.ignoreNulls","true")
    spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
    spark.conf.set("spark.cassandra.connection.port", "10350")
    spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
    // spark.cassandra.auth.username and password are set in cluster conf
    
    val write=dataFrame.writeStream.
              format("org.apache.spark.sql.cassandra").
              options(Map( "table" -> "****", "keyspace" -> "****")).
              foreachBatch(upsertToDelta _).
              outputMode("update").
              option("mergeSchema", "true").
              option("mode","PERMISSIVE").
              option("checkpointLocation", checkpointLocation).
              start()
            write.awaitTermination()
}

  def upsertToDelta(newBatch: DataFrame, batchId: Long) {

    try {
      val spark = SparkSession.active
      println(LocalDateTime.now())
      println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
      newBatch.persist()
      val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
      val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
    
      deDup.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "****", "keyspace" -> "****"))
        .mode("append")
        .save()

      newBatch.unpersist()
    } catch {
      case e: Exception =>
        throw e
    }
  }

############################

After implementing solution suggested by @theo-van-kraay, Getting error in executor's logs (Job keeps on running even after this error)

23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    ...
    ...

23/02/13 07:28:56 ERROR Utils: Aborting task

Solution

  • You can remove:

    com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 
    

    It is not required with Spark 3 Cassandra Connector and was created for Spark 2 only. Also remove references to it in the code.