Search code examples
pythonapache-sparkpysparkcassandraspark-cassandra-connector

how to use sparkSession in dataframe write in pyspark using spark-cassandra-connector


I am using pyspark and spark-cassandra-connector_2.11-2.3.0.jar with cassandra DB. I am reading dataframe from one keyspace and writing to another different keyspace. This two keyspace have different username and password.

I created sparkSession using:

spark_session = None

def set_up_spark(sparkconf,config):
    """
    sets up spark configuration and create a session
    :return: None
    """
    try:
        logger.info("spark conf set up Started")
        global spark_session
        spark_conf = SparkConf()
        for key, val in sparkconf.items():
            spark_conf.set(key, val)
        spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
        logger.info("spark conf set up Completed")
    except Exception as e:
        raise e

I used this sparkSession to read data as dataframe as:

table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .load()

I am able to read data using the above session. spark_session is attached to above query.

Now I need to create another session since the credentials for write table is different. I have the write query as:

table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .mode("append") \
            .save()

I couldn't find how to attach a new sparkSession for the above write operation in cassandra.

How Do I attach a new SparkSession for write operation in pyspark with spark-cassandra-connector?


Solution

  • You can simply pass that information as options to specific read or write operation, this includes things like: spark.cassandra.connection.host,

    Please note, that you'll need to put these options into a dictionary, and pass this dictionary instead of passing directly, like described in documentation.

    read_options = { "table": "..", "keyspace": "..", 
      "spark.cassandra.connection.host": "IP1", 
      "spark.cassandra.auth.username": "username1", 
      "spark.cassandra.auth.password":"password1"}
    table_df = spark_session.read \
                .format("org.apache.spark.sql.cassandra") \
                .options(**read_options) \
                .load()
    
    write_options = { "table": "..", "keyspace": "..", 
      "spark.cassandra.connection.host": "IP2", 
      "spark.cassandra.auth.username": "username2", 
      "spark.cassandra.auth.password":"password1"}
    table_df.write \
                .format("org.apache.spark.sql.cassandra") \
                .options(**write_options) \
                .mode("append") \
                .save()