Search code examples
mysqlcassandraapache-sparkdatastax-enterprise

C* migrations- move 1B+ rows table data to new schema table


I working with DSE 4.7 datastax-enterprise, C* 2.1.5, spark 1.2.1, and need to migrate the data from a big table to new empty table with different schema and additional column that need to be generated from one of the excising columns in the big table.

I know that migration of table data to another table with new schema can be done by spark or by copy command to csv file in cqlsh, but I am interested in a tool that can give me long term solution for future migrations and more options as managing and planing migrations.

I think that it is a common problem and I didn't find any solid solution.

Any ideas?


Solution

  • I have been convinced that Spark is the best tool for the job. I have tested the below code and the results are good.

    import java.sql.Date
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.sql.{Row, SQLContext}
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    import java.sql._
    import com.github.nscala_time.time.Imports._
    
    
    object Migration {
      def main(args: scala.Array[String]) {
    
        def changeDate(created: java.util.Date) : String = {
            var sDate = new DateTime(created)
            var sDay = sDate.getDayOfMonth()
            var sMonth = sDate.getMonthOfYear()
            var sYear = sDate.getYear()
            var created_date = "" + sYear + "-" + sMonth + "-" + sDay
            created_date //return
        }
    
        //spark configuration
        val conf = new SparkConf().setAppName("migration")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val connector = CassandraConnector(conf)
    
        val rdd = sc.cassandraTable("keyspace", "table_a")
    
        println("Starting migration...")
    
        rdd.map(row => { 
            val x = new java.util.Date(row.getLong("x"))
            val y = new java.util.Date(row.getLong("y"))
            val z = row.getString("z")
            val t = row.getString("t")
            val k = changeDate(x)
    
            connector.withSessionDo(session => {
                val statement = session.prepare(s"INSERT INTO keyspace.table_b (k, y, z, x, t) " + "values (?, ?, ?, ?, ?)")
                val bound = statement.bind(k, y, z, x, t)
                session.executeAsync(bound)
            })
        }).foreach(x => x.getUninterruptibly())
    
        println("Done.")
    
    
    
     } }