Search code examples
scalaapache-sparkcassandradatastax-java-driverspark-cassandra-connector

Datastax spark cassandra connector with RetryPolicy to write DF to cassandra table


I am trying to write a spark Dataframe to cassandra with consistency level "EACH_QUORUM". My code looks like below:

val sparkBuilder = SparkSession.builder().
  config(cassandraHostPropertyProperty, cassandraHosts).
  config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
  config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
  config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
  getOrCreate();

Below is the code to write DF:

df.write.cassandraFormat(tableName, keySpaceName)
    .mode(SaveMode.Append)
    .options(Map(
      WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
      WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
    ))
    .save()

I want to add a retry policy, so that if one of the datacenters in down, write downgrades consistency to LOCAL_QUORUM.

I am aware that datastax has a class MultipleRetryPolicy.scala which I should extend, override methods to add custom logic and use it's instance in cassandra conf.

How can I apply this policy to my sparksession or save operation? Is there any other way in scala with or without using RetryPolicy for achieving my requirement?


Solution

  • You don't want MultipleRetryPolicy, you're after DowngradingConsistencyRetryPolicy which isn't part of the spark driver, so doing this as part of driver settings is out unless you port the policy over to scala.

    What you can do is wrap your query execution in a try and catch the UnavailableException then just re-try with a lower consistency by changing the output.consistency.level parameter.