Search code examples
scalaakkaslickakka-streamreactive-streams

Running out of memory when loading large amount of records from database


I am using slick in Akka Streams to load large number of records (~2M) from the database (postgresql) and write them to an S3 File. However, I'm noticing that my code below works for records around ~50k but fails for anything over around 100k mark.

  val allResults: Future[Seq[MyEntityImpl]] =
    MyRepository.getAllRecordss()

  val results: Future[MultipartUploadResult] = Source
    .fromFuture(allResults)
    .map(seek => seek.toList)
    .mapConcat(identity)
    .map(myEntity => myEntity.toPSV + "\n")
    .map(s => ByteString(s))
    .runWith(s3Sink)

Below is a sample of how myEntity looks like:

case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
  def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}

I am looking for a way to do this in a more reactive way so that it doesn't run out of memory.


Solution

  • Underlying Problem

    The problem is that you are pulling all of the records from the database into local memory before dispatching them to the s3Sink.

    The first place that the data is being pulled into memory is likely in your MyRepository.getAllRecords() method. Most, if not all, of the Seq implementations are in-memory based. The second place where you're definitely utilizing local memory is in seek.toList because a List is storing all of the data in-memory.

    Solution

    Instead of returning a Seq from getAllRecords you should be returning a slick-based akka Source directly. This will ensure that your materialized stream will only need memory for the transient processing steps before going to s3.

    If your method definition changed to:

    def getAllRecords() : Source[MyEntityImpl, _]
    

    Then the rest of the stream would operate in a reactive manner:

    MyRepository
      .getAllRecords()
      .map(myEntity => myEntity.toPSV + "\n")
      .map(ByteString.apply)
      .runWith(s3Sink)