I am using .setCassandraConf(c_options_conf) to set sparkSession to connect cassandra cluster as show below.
Working fine:
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master",devProps.getString("deploymentMaster"))
.getOrCreate()
.setCassandraConf(c_options_conf)
If I save table using dataframe writer object as below it is pointing to the configured cluster and saving in Cassandra perfectly fine as below
writeDfToCassandra(o_vals_df, key_space , "model_vals"); //working fine using o_vals_df.
But if say as below it is pointing to localhost instead of cassandra cluster and failing to save.
Not working:
import spark.implicits._
val sc = spark.sparkContext
val audit_df = sc.parallelize(Seq(LogCaseClass(columnFamilyName, status,
error_msg,currentDate,currentTimeStamp, updated_user))).saveToCassandra(keyspace, columnFamilyName);
It is throwing error as it is trying connect localhost.
Error:
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: localhost/127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException:
[localhost/127.0.0.1:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)
What is wrong here? Why it is pointing to default localhost even though sparkSession set to cassandra cluster and earlier method is working fine.
We need to set the config using two set methods of SparkSession
, i.e. .config(conf)
and .setCassandraConf(c_options_conf)
with same values like below
val spark = SparkSession
.builder()
.appName("DatabaseMigrationUtility")
.config("spark.master",devProps.getString("deploymentMaster"))
.config("spark.dynamicAllocation.enabled",devProps.getString("spark.dynamicAllocation.enabled"))
.config("spark.executor.memory",devProps.getString("spark.executor.memory"))
.config("spark.executor.cores",devProps.getString("spark.executor.cores"))
.config("spark.executor.instances",devProps.getString("spark.executor.instances"))
.config(conf)
.getOrCreate()
.setCassandraConf(c_options_conf)
Then i would work for cassandra latest api as well as RDD/DF Api.