I am using slick in Akka Streams to load large number of records (~2M) from the database (postgresql) and write them to an S3 File. However, I'm noticing that my code below works for records around ~50k but fails for anything over around 100k mark.
val allResults: Future[Seq[MyEntityImpl]] =
MyRepository.getAllRecordss()
val results: Future[MultipartUploadResult] = Source
.fromFuture(allResults)
.map(seek => seek.toList)
.mapConcat(identity)
.map(myEntity => myEntity.toPSV + "\n")
.map(s => ByteString(s))
.runWith(s3Sink)
Below is a sample of how myEntity
looks like:
case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
def toPSV: String = {s"$field1|"+s"$field2"}
}
I am looking for a way to do this in a more reactive way so that it doesn't run out of memory.
Underlying Problem
The problem is that you are pulling all of the records from the database into local memory before dispatching them to the s3Sink
.
The first place that the data is being pulled into memory is likely in your MyRepository.getAllRecords()
method. Most, if not all, of the Seq
implementations are in-memory based. The second place where you're definitely utilizing local memory is in seek.toList
because a List
is storing all of the data in-memory.
Solution
Instead of returning a Seq
from getAllRecords
you should be returning a slick-based akka Source
directly. This will ensure that your materialized stream will only need memory for the transient processing steps before going to s3.
If your method definition changed to:
def getAllRecords() : Source[MyEntityImpl, _]
Then the rest of the stream would operate in a reactive manner:
MyRepository
.getAllRecords()
.map(myEntity => myEntity.toPSV + "\n")
.map(ByteString.apply)
.runWith(s3Sink)