I'm currently wondering how Slick (JDBC) Connector for Alpakka works under the hood - And I can't really find an answer using the Docs.
Considering a use case where I want to process a large number of records selected from a database. Can I simply use a SELECT * FROM [TABLE]
in a single stream, or does it make sense to start multiple streams for each page (one after the other) like SELECT * FROM [TABLE] LIMIT 0,1000
.
I hope/ think that the reactive fashion of the Slick Connector Alpakka takes care of only fetching records from the database when the stream needs them so that I can use the SELECT * FROM [TABLE]
...
Can anyone give me some insights or some good docs to read through?
Consider the source code for Alpakka's Slick.source
method:
/**
* Scala API: creates a Source[T, NotUsed] that performs the
* specified query against the (implicitly) specified
* Slick database and streams the results.
* This works for both "typed" Slick queries
* and "plain SQL" queries.
*
* @param streamingQuery The Slick query to execute, which can
* be either a "typed" query or a "plain SQL"
* query produced by one of the Slick "sql..."
* String interpolators
* @param session The database session to use.
*/
def source[T](
streamingQuery: StreamingDBIO[Seq[T], T]
)(implicit session: SlickSession): Source[T, NotUsed] =
Source.fromPublisher(session.db.stream(streamingQuery))
session.db.stream(streamingQuery))
above results in a DatabasePublisher
, which is a Reactive Streams Publisher
that is passed to Akka Stream's Source.fromPublisher
. Don't be concerned about trying to create multiple streams for subsets of the data; you can safely use a query that retrieves all the rows in a table and work with the resulting Source
as a single stream.
One thing to note is that you may need to configure a few settings as mentioned not in the Alpakka documentation, but in the Slick documentation:
Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both
.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(with the desired page sizen
) and.transactionally
for proper streaming.
So if you're using PostgreSQL, for example, then your Source
might look something like the following:
val source =
Slick.source(
TableQuery[Items]
.result
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
)
.transactionally)
TableQuery[Items].result
returns all the rows in the table associated with Items
.
The documentation notwithstanding, I've successfully used Slick's DatabasePublisher
in conjunction with Akka Streams to retrieve and update millions of rows from a table in PostgreSQL without setting withStatementParameters
or transactionally
. Try it without those settings:
val source = Slick.source(TableQuery[Items].result)