Search code examples
scalaakkaakka-streamakka-http

How to bind akka http with akka streams?


I'm trying to use streams instead of pure actors to handle http requests and I came with the following code:

trait ImagesRoute {

  val log = LoggerFactory.getLogger(this.getClass)

  implicit def actorRefFactory: ActorRefFactory
  implicit def materializer: ActorMaterializer

  val source =
    Source
      .actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
      .via(Flow[Image].mapAsync(1)(ImageRepository.add))
      .toMat(Sink.asPublisher(true))(Keep.both)

  val route = {
    pathPrefix("images") {
      pathEnd {
        post {
          entity(as[Image]) { image =>

            val (ref, publisher) = source.run()

            val addFuture = Source.fromPublisher(publisher)

            val future = addFuture.runWith(Sink.head[Option[Image]])

            ref ! image

            onComplete(future.mapTo[Option[Image]]) {
              case Success(img) =>
                complete(Created, img)

              case Failure(e) =>
                log.error("Error adding image resource", e)
                complete(InternalServerError, e.getMessage)
            }
          }
        }
      }
    }
  }
}

I'm not sure if this is the correct way to do that, or even if this is a good approach or if I should use an actor to interact with the route, using the ask pattern and then inside the actor, stream everything.

Any ideas?


Solution

  • If you're only expecting 1 image from the entity then you don't need to create a Source from an ActorRef and you don't need Sink.asPublisher, you can simply use Source.single:

    def imageToComplete(img : Option[Image]) : StandardRoute = 
      img.map(i => complete(Created, i))
         .getOrElse {
           log error ("Error adding image resource", e)
           complete(InternalServerError, e.getMessage
         }
    
    ...
    
    entity(as[Image]) { image =>
    
      val future : Future[StandardRoute] = 
        Source.single(image)
              .via(Flow[Image].mapAsync(1)(ImageRepository.add))
              .runWith(Sink.head[Option[Image]])
              .map(imageToComplete)
    
      onComplete(future)
    }
    

    Simplifying your code further, the fact that you are only processing 1 image means that Streams are unnecessary since there is no need for backpressure with just 1 element:

    val future : Future[StandardRoute] = ImageRepository.add(image)
                                                        .map(imageToComplete)
    
    onComplete(future)
    

    In the comments you indicated

    "this is just a simple example, but the stream pipeline should be bigger doing a lot of things like contacting external resources and eventually back pressure things"

    This would only apply if your entity was a stream of images. If you're only ever processing 1 image per HttpRequest then backpressure never applies, and any stream you create will be a slower version of a Future.

    If your entity is in fact a stream of Images, then you could use it as part of stream:

    val byteStrToImage : Flow[ByteString, Image, _] = ???
    
    val imageToByteStr : Flow[Image, Source[ByteString], _] = ???
    
    def imageOptToSource(img : Option[Image]) : Source[Image,_] =
      Source fromIterator img.toIterator
    
    val route = path("images") {
      post {
        extractRequestEntity { reqEntity =>
    
          val stream = reqEntity.via(byteStrToImage)
                                .via(Flow[Image].mapAsync(1)(ImageRepository.add))
                                .via(Flow.flatMapConcat(imageOptToSource))
                                .via(Flow.flatMapConcat(imageToByteStr))
    
          complete(HttpResponse(status=Created,entity = stream))
        }
      }
    }