Search code examples
scalaapache-sparkcassandradata-migrationdatastax-enterprise

Cassandra big table migration bottleneck


I'm trying to migrate big table in cassandra to new empty table (with different primary key), in the same keyspace and cluster, by using spark 1.2.1:

val rdd_table_a = sc.cassandraTable("keyspace", "table_a").filter(row => row.getLong("a") >= start_a && row.getLong("a") <= end_a)

    rdd_table_a.map(row => { 
        val a = row.getLong("a")
        val b = row.getLong("b")
        val c = row.getString("c")
        val d = row.getString("d")
        val new_a = generateSomeNewValue(a)
        connector.withSessionDo(session => {
            val statement = session.prepare(s"INSERT INTO keyspace.table_b (new_a, c, b, a, d) " + "values (?, ?, ?, ?, ?)")
            val bound = statement.bind(new_a, c, b, a, d)
            session.executeAsync(bound)
        })
    }).foreach(x => x.getUninterruptibly())

The table have more then 1B rows and even when I trying to process small part of it- it takes more then 7 hours. I searched in the documentation and didn't find- is connector.withSessionDo open in each loop iteration another session?

What can be the bottleneck in above code snippet?


Solution

  • conn.withSessionDo executes custom CQL query using current shared connection to Cassandra:

    Allows to use Cassandra Session in a safe way without risk of forgetting to close it. The Session object obtained through this method is a proxy to a shared, single Session associated with the cluster.

    Internally, the shared underlying Session will be closed shortly after all the proxies are closed.

    You can rewrite your code using saveToCassandra approach which is more typical.

    As for my personal experience working with Spark+Cassandra, the slowest point for such queries is the Cassandra itself: data scans for huge tables are really slow (compared to Parquet).