Search code examples
scalaapache-sparkcassandrabigdataspark-cassandra-connector

How to load csv couple of lines per couple of lines


I'm connecting Spark to Cassandra and I was able to print the lines of my CSV using the conventional COPY method. However, if the CSV was very large as it usually happens in Big Data, how could one load the CSV file couple of lines per couple of lines in order to avoid freezing related issues etc. ?

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SparkCassandra {

  def main(args: Array[String]): Unit = {

      val conf = new SparkConf().setAppName("SparkCassandra").setMaster("local").set("spark.cassandra.connection.host", "localhost")
      val sc = new SparkContext(conf)
      val my_rdd = sc.cassandraTable("my_keyspace", "my_csv")
      my_rdd.take(20).foreach(println)
      sc.stop()
  }
}

Should one use a time variable or something of that nature?


Solution

  • If you want just to load data into Cassandra, or unload data from Cassandra using the command-line, I would recommend to look to the DataStax Bulk Loader (DSBulk) - it's heavily optimized for loading data to/from Cassandra/DSE. It works with both open source Cassandra and DSE.

    In simplest case loading into & unloading from table will look as (default format is CSV):

    dsbulk load -k keyspace -t table -url my_file.csv
    dsbulk unload -k keyspace -t table -url my_file.csv
    

    For more complex cases you may need to provide more options. You can find more information in following series of the blog posts.

    If you want to do this with Spark, then I recommend to use Dataframe API instead of RDDs. In this case you'll just use standard read & write functions.

    to export data from Cassandra to CSV:

    import org.apache.spark.sql.cassandra._
    val data = spark.read.cassandraFormat("tbl", "ks").load()
    data.write.format("csv").save("my_file.csv")
    

    or read from CSV and store in Cassandra:

    import org.apache.spark.sql.cassandra._
    import org.apache.spark.sql.SaveMode
    val data = spark.read.format("csv").save("my_file.csv")
    data.cassandraFormat("tbl", "ks").mode(SaveMode.Append).save()