Search code examples
apache-sparkcassandraspark-cassandra-connector

Spark batch to migrate data between 2 cassandra clusters


I'm using spark to move some of the data from one cassandra table to another cassandra table on another cluster.

I specified cassandra config for one of the source cluster with followings:

/*
spark.cassandra.connection.host: 
spark.cassandra.connection.port:
spark.cassandra.auth.username:
spark.cassandra.auth.password:
spark.cassandra.connection.ssl.clientAuth.enabled: true
spark.cassandra.connection.ssl.enabled: true
spark.cassandra.connection.ssl.trustStore.path: 
spark.cassandra.connection.ssl.trustStore.password: 
spark.cassandra.connection.timeout_ms: */

SparkSession spark = SparkSession.builder()
            .config(conf)
            .getOrCreate();

Dataset<Row> df = spark.read()
            .format("org.apache.spark.sql.cassandra")
            .options(config.getSourceTable())
            .load();
df.show();

// *** How/Where do I specify cassandra config in destination cluster? ***
df.write()
        .mode(SaveMode.Append)
        .format("org.apache.spark.sql.cassandra")
        .options(destinationTbl);

How/Where do I specify cassandra config in destination cluster (Java Perferred)?

Thanks!


Solution

  • I haven't tested it, but based on the Russel Spitzer's blog post you can do following (not tested in Java, but should work):

    • Set 2 configuration options (or add them when creating spark instance):
    spark.setConf("ClusterSource/spark.cassandra.connection.host", "127.0.0.1");
    spark.setConf("ClusterDestination/spark.cassandra.connection.host", "127.0.0.2");
    
    • Add into the options call the name of the corresponding cluster as a cluster entry.

    P.S. Also, remember that if you'll need to migrate data and keep WriteTime and/or TTL on the data, then you'll need to use RDD API, as these things aren't supported in the DataFrame API.