Search code examples
apache-sparkcassandradatabricksdatastax-enterprisespark-cassandra-connector

spark setCassandraConf is not working as expected


I am using .setCassandraConf(c_options_conf) to set sparkSession to connect cassandra cluster as show below.

Working fine:

 val spark = SparkSession
      .builder()
      .appName("DatabaseMigrationUtility")
      .config("spark.master",devProps.getString("deploymentMaster"))
      .getOrCreate()
                .setCassandraConf(c_options_conf)

If I save table using dataframe writer object as below it is pointing to the configured cluster and saving in Cassandra perfectly fine as below

 writeDfToCassandra(o_vals_df, key_space , "model_vals"); //working fine using o_vals_df.

But if say as below it is pointing to localhost instead of cassandra cluster and failing to save.

Not working:

import spark.implicits._
val sc = spark.sparkContext

val audit_df = sc.parallelize(Seq(LogCaseClass(columnFamilyName, status,
      error_msg,currentDate,currentTimeStamp, updated_user))).saveToCassandra(keyspace, columnFamilyName);

It is throwing error as it is trying connect localhost.

Error:

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: localhost/127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException:
[localhost/127.0.0.1:9042] Cannot connect))
            at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)

What is wrong here? Why it is pointing to default localhost even though sparkSession set to cassandra cluster and earlier method is working fine.


Solution

  • We need to set the config using two set methods of SparkSession, i.e. .config(conf) and .setCassandraConf(c_options_conf) with same values like below

      val spark = SparkSession
            .builder()
            .appName("DatabaseMigrationUtility")
            .config("spark.master",devProps.getString("deploymentMaster"))
            .config("spark.dynamicAllocation.enabled",devProps.getString("spark.dynamicAllocation.enabled"))
            .config("spark.executor.memory",devProps.getString("spark.executor.memory"))
            .config("spark.executor.cores",devProps.getString("spark.executor.cores"))
            .config("spark.executor.instances",devProps.getString("spark.executor.instances"))
            .config(conf)
    
            .getOrCreate()
            .setCassandraConf(c_options_conf)
    

    Then i would work for cassandra latest api as well as RDD/DF Api.