Search code examples
scalacassandraakka

How to stream all records from Cassandra?


I need to stream all records from Cassandra. Currently I am using akka-persistence-cassandra to stream the data:

val querier =
        PersistenceQuery(system)
          .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      
      val selectDistinctPersistenceIds = new SimpleStatement(
      "SELECT DISTINCT persistence_id, partition_nr FROM messages")
        .setFetchSize(100000)

        querier.session.select(selectDistinctPersistenceIds).map { row =>
          val id = row.getString(0)
          id
        }

This works fine when the number of records are around 1.5 million. But when the number of records exceeds > 1.5 million records, then I get read timeout error.

I am using:

"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.58"
"com.typesafe.akka" %% "akka-persistence" % "2.6.12"
"com.typesafe.akka" %% "akka-persistence-query" % "2.6.12"

Edit: Error Logs:

com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response", exceptionStackTrace="java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [/<ip-address>:9042] Timed out waiting for server response
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
    at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:25)
...

Solution

  • I was able to fix this issue by setting a higher value for cassandra-journal.socket.read-timeout-millis than the default value of 12000ms.

    cassandra-journal {
      ...
    
      socket {
        # the per-host read timeout in milliseconds. Should be higher than the timeout settings
        # used on the Cassandra side.
        read-timeout-millis = 30000
    }