Search code examples
apache-sparkcassandraspark-streamingakka-streamphantom-dsl

How to effectively read millions of rows from Cassandra?


I have a hard task to read from a Cassandra table millions of rows. Actually this table contains like 40~50 millions of rows. The data is actually internal URLs for our system and we need to fire all of them. To fire it, we are using Akka Streams and it have been working pretty good, doing some back pressure as needed. But we still have not found a way to read everything effectively.

What we have tried so far:

  • Reading the data as Stream using Akka Stream. We are using phantom-dsl that provides a publisher for a specific table. But it does not read everything, only a small portion. Actually it stops to read after the first 1 million.

  • Reading using Spark by a specific date. Our table is modeled like a time series table, with year, month, day, minutes... columns. Right now we are selecting by day, so Spark will not fetch a lot of things to be processed, but this is a pain to select all those days.

The code is the following:

val cassandraRdd =
      sc
        .cassandraTable("keyspace", "my_table")
        .select("id", "url")
        .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)

Unfortunately I can't iterate over the partitions to get less data, I have to use a collect because it complains the actor is not serializable.

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async

val source =
  Source
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail)
    .map(row => makeUrl(row.getString("id"), row.getString("url")))
    .map(url => HttpRequest(uri = url) -> url)

val ref = Flow[(HttpRequest, String)]
  .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
  .to(Sink.actorRef(httpHandlerActor, IsDone))
  .runWith(source)

cassandraRdd.collect().foreach { row =>
  ref ! row
}

I would like to know if any of you have such experience on reading millions of rows for doing anything different from aggregation and so on.

Also I have thought to read everything and send to a Kafka topic, where I would be receiving using Streaming(spark or Akka), but the problem would be the same, how to load all those data effectively ?

EDIT

For now, I'm running on a cluster with a reasonable amount of memory 100GB and doing a collect and iterating over it.

Also, this is far different from getting bigdata with spark and analyze it using things like reduceByKey, aggregateByKey, etc, etc.

I need to fetch and send everything over HTTP =/

So far it is working the way I did, but I'm afraid this data get bigger and bigger to a point where fetching everything into memory makes no sense.

Streaming this data would be the best solution, fetching in chunks, but I haven't found a good approach yet for this.

At the end, I'm thinking of to use Spark to get all those data, generate a CSV file and use Akka Stream IO to process, this way I would evict to keep a lot of things in memory since it takes hours to process every million.


Solution

  • Well, after spending sometime reading, talking with other guys and doing tests the result could be achieve by the following code sample:

    val sc = new SparkContext(sparkConf)
    
    val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
      .select("key", "value")
      .as((key: String, value: String) => (key, value))
      .partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
      .cache()
    
    cassandraRdd
      .groupByKey()
      .foreachPartition { partition =>
        partition.foreach { row =>
    
          implicit val system = ActorSystem()
          implicit val materializer = ActorMaterializer()
    
          val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")
    
          val source = Source.fromIterator { () => row._2.toIterator }
          source
            .map { str =>
              myActor ! Count
              str
            }
            .to(Sink.actorRef(myActor, Finish))
            .run()
        }
      }
    
    sc.stop()
    
    
    class MyActor(system: ActorSystem) extends Actor {
    
      var count = 0
    
      def receive = {
    
        case Count =>
          count = count + 1
    
        case Finish =>
          println(s"total: $count")
          system.shutdown()
    
      }
    }
    
    case object Count
    case object Finish
    

    What I'm doing is the following:

    • Try to achieve a good number of Partitions and a Partitioner using the partitionBy and groupBy methods
    • Use Cache to prevent Data Shuffle, making your Spark move large data across nodes, using high IO etc.
    • Create the whole actor system with it's dependencies as well as the Stream inside the foreachPartition method. Here is a trade off, you can have only one ActorSystem but you will have to make a bad use of .collect as I wrote in the question. However creating everything inside, you still have the ability to run things inside spark distributed across your cluster.
    • Finish each actor system at the end of the iterator using the Sink.actorRef with a message to kill(Finish)

    Perhaps this code could be even more improved, but so far I'm happy to do not make the use of .collect anymore and working only inside Spark.