I am using pyspark and spark-cassandra-connector_2.11-2.3.0.jar with cassandra DB. I am reading dataframe from one keyspace and writing to another different keyspace. This two keyspace have different username and password.
I created sparkSession using:
spark_session = None
def set_up_spark(sparkconf,config):
"""
sets up spark configuration and create a session
:return: None
"""
try:
logger.info("spark conf set up Started")
global spark_session
spark_conf = SparkConf()
for key, val in sparkconf.items():
spark_conf.set(key, val)
spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
logger.info("spark conf set up Completed")
except Exception as e:
raise e
I used this sparkSession to read data as dataframe as:
table_df = spark_session.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=table_name, keyspace=keyspace_name) \
.load()
I am able to read data using the above session. spark_session is attached to above query.
Now I need to create another session since the credentials for write table is different. I have the write query as:
table_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table=table_name, keyspace=keyspace_name) \
.mode("append") \
.save()
I couldn't find how to attach a new sparkSession for the above write operation in cassandra.
How Do I attach a new SparkSession for write operation in pyspark with spark-cassandra-connector?
You can simply pass that information as options to specific read
or write
operation, this includes things like: spark.cassandra.connection.host
,
Please note, that you'll need to put these options into a dictionary, and pass this dictionary instead of passing directly, like described in documentation.
read_options = { "table": "..", "keyspace": "..",
"spark.cassandra.connection.host": "IP1",
"spark.cassandra.auth.username": "username1",
"spark.cassandra.auth.password":"password1"}
table_df = spark_session.read \
.format("org.apache.spark.sql.cassandra") \
.options(**read_options) \
.load()
write_options = { "table": "..", "keyspace": "..",
"spark.cassandra.connection.host": "IP2",
"spark.cassandra.auth.username": "username2",
"spark.cassandra.auth.password":"password1"}
table_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(**write_options) \
.mode("append") \
.save()