Search code examples
scalaslickakka-streamalpakka

Does it make sense to use Paging SQL Statement when using Slick (JDBC) Connector for Alpakka


I'm currently wondering how Slick (JDBC) Connector for Alpakka works under the hood - And I can't really find an answer using the Docs.

Considering a use case where I want to process a large number of records selected from a database. Can I simply use a SELECT * FROM [TABLE] in a single stream, or does it make sense to start multiple streams for each page (one after the other) like SELECT * FROM [TABLE] LIMIT 0,1000.

I hope/ think that the reactive fashion of the Slick Connector Alpakka takes care of only fetching records from the database when the stream needs them so that I can use the SELECT * FROM [TABLE] ...

Can anyone give me some insights or some good docs to read through?


Solution

  • Consider the source code for Alpakka's Slick.source method:

    /**
     * Scala API: creates a Source[T, NotUsed] that performs the
     *            specified query against the (implicitly) specified
     *            Slick database and streams the results.
     *            This works for both "typed" Slick queries
     *            and "plain SQL" queries.
     *
     * @param streamingQuery The Slick query to execute, which can
     *                       be either a "typed" query or a "plain SQL"
     *                       query produced by one of the Slick "sql..."
     *                       String interpolators
     * @param session The database session to use.
     */
    def source[T](
        streamingQuery: StreamingDBIO[Seq[T], T]
    )(implicit session: SlickSession): Source[T, NotUsed] =
      Source.fromPublisher(session.db.stream(streamingQuery))
    

    session.db.stream(streamingQuery)) above results in a DatabasePublisher, which is a Reactive Streams Publisher that is passed to Akka Stream's Source.fromPublisher. Don't be concerned about trying to create multiple streams for subsets of the data; you can safely use a query that retrieves all the rows in a table and work with the resulting Source as a single stream.

    One thing to note is that you may need to configure a few settings as mentioned not in the Alpakka documentation, but in the Slick documentation:

    Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

    So if you're using PostgreSQL, for example, then your Source might look something like the following:

    val source =
      Slick.source(
        TableQuery[Items]
          .result
          .withStatementParameters(
            rsType = ResultSetType.ForwardOnly,
            rsConcurrency = ResultSetConcurrency.ReadOnly,
            fetchSize = 10
          )
          .transactionally)
    

    TableQuery[Items].result returns all the rows in the table associated with Items.

    The documentation notwithstanding, I've successfully used Slick's DatabasePublisher in conjunction with Akka Streams to retrieve and update millions of rows from a table in PostgreSQL without setting withStatementParameters or transactionally. Try it without those settings:

    val source = Slick.source(TableQuery[Items].result)