Search code examples
postgresqlscalaslickconcurrent.futuresslick-3.0

rejected from slick.util.AsyncExecutor on "large" Future.sequence


I spent all day trying to figure out how to solve this issue.

The purpose is to insert several sequence of strings into a single column of a table.

I have a method like this:

case class Column(strings: Seq[String])

def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()

private def insert(column: Column) =
  db.run((stringTable ++= rows)) //slick batch insert

This is working to a point. I tested for a sequence of 2100 columns (with 100 strings in each), and it works fine. But as soon as I increase the number of columns to 3100+, I have this error

Task slick.basic.BasicBackend$DatabaseDef$$anon$3@293ce053 rejected from slick.util.AsyncExecutor$$anon$1$$anon$2@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]

I have read on several places that doing something like this would help

case class Column(strings: Seq[String])

val f = Future.sequence(columns.map(col => insert(col)))

def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()

private def insert(column: Column) =
  db.run((stringTable ++= rows)) //slick batch insert

it does not.

I tried several combination of changes inside insert

Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)
Source(rows).buffer(500, OverflowStrategy.backpressure)
  .via(
    Slick.flow(row => stringTable += row)
  )
  .log("nr-of-inserted-rows")
  .runWith(Sink.ignore)
Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))

I tried:

  • to not use reWriteBatchedInserts=true inside my config
  • (dataColumnStringsTable ++= rows).transactionally option
  • use a specific execution context to enable a single thread: implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) to try to execute the futures sequentially

I don't have any other idea than reworking my subscriber to receive and block my messages (sequence of strings) and handle the back pressure at queue messaging side.

I am using slick (with alpakka-slick) 3.3.3 / HikariCP 3.2.0 / Postgres 13.2

My config is as such

slick {
  profile = "slick.jdbc.PostgresProfile$"
  db {
      connectionPool = "HikariCP"
      dataSourceClass = "slick.jdbc.DriverDataSource"
      properties = {
        driver = "org.postgresql.Driver"
        user = "postgres"
        password = "password"
        url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
      }
      host = "localhost"
      numThreads = 10
      maxConnections = 100
      minConnections = 1
    }
}

Thank your for your help.


Solution

  • You shouldn't use Future.sequence with collections of more than a few elements. Every Future is a computation running in the background. So when you run this for a collection of, let's say, 3000 columns:

    Future.sequence(columns.map(col => insert(col)))
    

    you effectively spawn 3000 operations at once. As a result, the executor may start rejecting new tasks.

    The solution is to process the input collection with Akka Streams. In your case, this means creating a Source from columns (not from rows). This will ensure that the executor is not overwhelmed with too many parallel operations. I haven't used alpakka-slick, but looking at the docs, the solution should look something like this:

    Source(columns)
      .via(
        Slick.flow(column => stringTable ++= column.rows) 
      )
      // further processing here
    

    What's more, if "columns" are coming from a message queue, it's possible that you don't even need an intermediate Seq[Column]. You may simply need to define a Source of Column that reads from the queue, and process it with a Slick flow.