Search code examples
httpakkathreadpoolblocking

Making blocking http call in akka stream processing


I am new to akka and still trying to understand the different akka and streaming concepts. For some new feature i need to add a http call to already existing stream which is working on an internal object. Something like this -

    val step1Flow = Flow[SampleObject].filter(...--Filtering condition--...)
    val step2Flow = Flow[SampleObject].map(obj => {
         ...
         -- Business logic to update values in the obj --
         ...
    })
    ...
override val flowGraph: Flow[SampleObject, SampleObject, NotUsed] =
bufferIn.via(Flow.fromGraph(GraphDSL.create() {
  implicit builder =>
    import GraphDSL.Implicits._
    ...
    val step1 = builder.add(step1Flow)
    val step2 = builder.add(step2Flow)
    val step3 = builder.add(step3Flow)
    ...

    source ~> step1 ~> step2 ~> step3 ~> merge
    ...
}

I need to add the new http request flow (lets call it newFlow) after step1. All these flow have Inlet and Outlet as SampleObject. Now my understanding is that the newFlow would need to be blocking because the outlet need to be SampleObject only. For that I have used Await function on the http call future. The code looks like this -

val responseFuture: Future[(Try[HttpResponse], SomeContext)] =
  Source
    .single(httpRequest -> context)
    .via(Retry(retrySettings).join(clientFlow))
    .runWith(Sink.head)
...
val (httpTry, passedAlongContext) = Await.result(responseFuture, 30.seconds)
-- logic to process response and return SampleObject --

Now this works fine but i think there should be a better way to do this without using wait. Also i think this would block the main thread till the request completes, which is going to affect the app throughput. Could you please guide if the approach i used is correct or not. And how do i make use of some other thread pool to handle these blocking call so my main threadpool is not affected

This question seems very similar to mine but i do not understand it completely - connect Akka HTTP to Akka stream . Also i can't change the step2 or further flows.

EDIT : Added some code details for the stream


Solution

  • I ended up using the approach mentioned in the question because i couldn't find anything better after looking around. Adding this step decreased the throughput of my application as expected, but there are approaches to increase that can be used. Check these awesome blogs by Colin Breck -

    To summarize -

    1. Use Asynchronous Boundaries for flows which are blocking.
    2. Use Futures if possible and add callbacks to futures. There are several ways to do that.
    3. Use Buffers. There are several types of buffers available, choose what suits your needs.

    Other than these, you can use inbuilt flows like -

    1. Use "Broadcast" to broadcast your events to multiple consumers.
    2. Use "Partition" to partition your stream into multiple streams based on some condition.
    3. Use "Balance" to partition your stream when there is no logical way to partition your events or they all could have different work loads.

    You could use any one or multiple things from above options.