Search code examples
scalaslickakka-streamalpakka

Processing a big table with Slick fails with OutOfMemoryError


I am querying a big MySQL table with Akka Streams and Slick, but it fails with an OutOfMemoryError. It seems that Slick is loading all the results into memory (it does not fail if the query is limited to a few rows). Why is this the case, and what is the solution?

val dbUrl = "jdbc:mysql://..."

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.slick.scaladsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import slick.jdbc.GetResult

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val slickDbConfig = s"""
 |profile = "slick.jdbc.MySQLProfile$$"
 |db {
 |  dataSourceClass = "slick.jdbc.DriverDataSource"
 |  properties = {
 |    driver = "com.mysql.jdbc.Driver",
 |    url = "$dbUrl"
 |  }
 |}
 |""".stripMargin

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val slickSession: SlickSession = SlickSession.forConfig(ConfigFactory.parseString(slickDbConfig))
import slickSession.profile.api._

val responses: Source[String, NotUsed] = Slick.source(
  sql"select my_text from my_table".as(GetResult(r => r.nextString())) // limit 100
)

val future = responses.runForeach((myText: String) =>
  println("my_text: " + myText.length)
)

Await.result(future, Duration.Inf)

Solution

  • From the Slick documentation:

    Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

    In other words, to prevent the database from loading all the query results into memory, one might need additional configuration. This configuration is database dependent. The MySQL documentation states the following:

    By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time.

    To enable this functionality, create a Statement instance in the following manner:

    stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,
                  java.sql.ResultSet.CONCUR_READ_ONLY);
    stmt.setFetchSize(Integer.MIN_VALUE);
    

    The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row.

    To set the above configuration in Slick:

    import slick.jdbc._
    
    val query =
      sql"select my_text from my_table".as(GetResult(r => r.nextString()))
        .withStatementParameters(
          rsType = ResultSetType.ForwardOnly,
          rsConcurrency = ResultSetConcurrency.ReadOnly,
          fetchSize = Int.MinValue
        )//.transactionally <-- I'm not sure whether you need ".transactionally"
    
    val responses: Source[String, NotUsed] = Slick.source(query)