Search code examples
scalaakkaakka-streamreactive-streamsscala-streams

How to emulate Sink in akka streams?


I have a simple "save" function that is using akka-stream-alpakka multipartUpload, it looks like this:

  def save(fileName: String): Future[AWSLocation] = {

    val uuid: String = s"${UUID.randomUUID()}"

    val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(s"$bucketName", s"$uuid/$fileName")

    val file = Paths.get(s"/tmp/$fileName")

    FileIO.fromPath(file).runWith(s3Sink).map(res => {
      AWSLocation(uuid, fileName, res.key)
    }).recover {
      case ex: S3Exception =>
        logger.error("Upload to S3 failed with s3 exception", ex)
        throw ex
      case ex: Throwable =>
        logger.error("Upload to S3 failed with an unknown exception", ex)
        throw ex
    }
  }

I want to test this function, 2 cases:

  1. that multipartUpload succeed and I get AWSLocation (my case class) back.
  2. that multipartUpload fails and I get S3Exception

so i thought to spy on multipartUpload and return my own sink, like this:

  val mockAmazonS3ProxyService: S3ClientProxy = mock[S3ClientProxy]

  val s3serviceMock: S3Service = mock[S3Service]

  override val fakeApplication: Application = GuiceApplicationBuilder()
    .overrides(bind[S3ClientProxy].toInstance(mockAmazonS3ProxyService))
    .router(Router.empty).build()

  "test" in {
    when(mockAmazonS3ProxyService.multipartUpload(anyString(), anyString())) thenReturn Sink(ByteString.empty, Future.successful(MultipartUploadResult(Uri(""),"","myKey123","",Some(""))))

    val res = s3serviceMock.save("someFileName").futureValue

    res.key shouldBe "myKey123"

  }

the issue is that i get Error:(47, 93) akka.stream.scaladsl.Sink.type does not take parameters, i understand i cant create sink like this, but how can i? or what could be a better way testing this?


Solution

  • Consider redesigning your method save so it becomes more testable and injection of specific sink that produce different outcomes for different tests is possible (as mentioned by Bennie Krijger).

      def save(fileName: String): Future[AWSLocation] = {
        val uuid: String = s"${UUID.randomUUID()}"
        save(fileName)(() => s3Client.multipartUpload(s"$bucketName", s"$uuid/$fileName"))
      }
    
      def save(
        fileName: String
      )(createS3UploadSink: () => Sink[ByteString, Future[MultipartUploadResult]]): Future[AWSLocation] = {
    
        val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = createS3UploadSink()
    
        val file = Paths.get(s"/tmp/$fileName")
    
        FileIO
          .fromPath(file)
          .runWith(s3Sink)
          .map(res => {
            AWSLocation(uuid, fileName, res.key)
          })
          .recover {
            case ex: S3Exception =>
              logger.error("Upload to S3 failed with s3 exception", ex)
              throw ex
            case ex: Throwable =>
              logger.error("Upload to S3 failed with an unknown exception", ex)
              throw ex
          }
      }
    

    The test can look like

    class MultipartUploadSpec extends TestKit(ActorSystem("multipartUpload")) with FunSpecLike {
    
      implicit val mat: Materializer = ActorMaterializer()
    
      describe("multipartUpload") {
        it("should pass failure") {
          val result = save(() => Sink.ignore.mapMaterializedValue(_ => Future.failed(new RuntimeException)))
          // assert result
        }
    
        it("should pass successfully") {
          val result = save(() => Sink.ignore.mapMaterializedValue(_ => Future.successful(new MultipartUploadResult(???))))
          // assert result
        }
      }