Search code examples
scalaamazon-s3alpakkascala-2

S3.getObject is truncating file contents in alpakka 4.0.0


I have a block of code like this:

  def downloadFilesSource(bucketName: String, primaryKey: String)(
    implicit ec: ExecutionContext
  ): Source[(Source[ByteString, NotUsed], String), NotUsed] =
    S3.listBucket(bucketName, prefix = Some(primaryKey))
      .mapAsync(configuration.config.downloadParallelism.value)(
        (r: ListBucketResultContents) =>
          Future {
            S3.download(r.bucketName, r.key).zip(Source.single(r.key))
          }
      ) 
      .flatMapConcat(identity)
      .map {
        case (Some(x), key) => (x._1, key)
        case (None, _)      => throw new RuntimeException()
      }

which downloads all the files in an Amazon S3 bucket, and returns (a Source of) the contents tuple'd with the name of the file.

Upgrading to Alpakka 4.0.0, the S3.download method has been removed. The following code seemed like a good replacement:

def downloadFilesSource(bucketName: String, primaryKey: String)(
    implicit ec: ExecutionContext
  ): Source[(ByteString, String), NotUsed] = {
    S3.listBucket(bucketName, prefix = Some(primaryKey))
      .mapAsync(configuration.config.downloadParallelism.value)((r: ListBucketResultContents) =>
        Future {
          S3.getObject(r.bucketName, r.key).zip(Source.single(r.key))
        }
      ) 
      .flatMapConcat(identity)
}

However, the contents of the file is always truncated.

As an experiment, I also tried (in the function body):

    S3.listBucket(bucketName, prefix = Some(primaryKey))
      .map((r: ListBucketResultContents) => S3.getObject(r.bucketName, r.key).zip(Source.single(r.key)))
      .flatMapConcat(identity)

in case I wasn't waiting on the future correctly, but the files are truncated in the same way. I assume there's something I'm just missing about the streaming nature of Alpakka.

I've looked at Alpakka and S3 truncating downloaded files, but I don't see that the answers there are relevant :(


Solution

  • The source of the problems I was having seems to be that, as the Akka docs imply, a zip of sources will complete as soon as any one of the zipped streams completes. A Source.single is defined to complete after one object, so if getObject emits multiple chunks the zip will discard all subsequent chunks.

    Handling the get like this, and mapping the complete bytestring to a tuple afterwards (in place of the zip) avoids the truncation:

            S3.getObject(r.bucketName, r.key)
              .fold(ByteString.empty)(_ ++ _)
              .map(byteString => (byteString, r.key))