Search code examples
akka-streamakka-http

Akka HTTP streaming API with cycles never completes


I'm building an application where I take a request from a user, call a REST API to get back some data, then based on that response, make another HTTP call and so on. Basically, I'm processing a tree of data where each node in the tree requires me to recursively call this API, like this:

     A
    / \
   B   C
  / \   \
 D   E   F

I'm using Akka HTTP with Akka Streams to build the application, so I'm using it's streaming API, like this:

val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
   import GraphDSL.Implicits._

   val merge = b.add(Merge[Data](2))
   val bcast = b.add(Broadcast[ResponseData](2))

   takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
                   merge <~         extractSubtree                       <~ bcast

   FlowShape(takeUserData.in, bcast.out(1))
}

I understand that the best way to handle recursion in an Akka Streams application is to handle recursion outside of the stream, but since I'm recursively calling the HTTP flow to get each subtree of data, I wanted to make sure that the flow was properly backpressured in case the API becomes overloaded.

The problem is that this stream never completes. If I hook it up to a simple source like this:

val source = Source.single(data)
val sink = Sink.seq[ResponseData]

source.via(flow).runWith(sink)

It prints out that it's processing all the data in the tree and then stops printing anything, just idling forever.

I read the documentation about cycles and the suggestion was to put a MergePreferred in there, but that didn't seem to help. This question helped, but I don't understand why MergePreferred wouldn't stop the deadlock, since unlike their example, the elements are removed from the stream at each level of the tree.

Why doesn't MergePreferred avoid the deadlock, and is there another way of doing this?


Solution

  • MergePreferred (in the absence of eagerComplete being true) will complete when all the inputs have completed, which tends to generally be true of stages in Akka Streams (completion flows down from the start).

    So that implies that the merge can't propagate completion until both the input and extractSubtree signal completion. extractSubtree won't signal completion (most likely, without knowing the stages in that flow) until bcast signals completion which (again most likely) won't happen until processResponse signals completion which* won't happen until httpFlow signals completion which* won't happen until createRequest signals completion, which* won't happen until merge signals completion. Because detecting this cycle in general is impossible (consider that there are stages for which completion is entirely dynamic), Akka Streams effectively takes the position that if you want to create a cycle like this, it's on you to determine how to break the cycle.

    As you've noticed, eagerComplete being true changes this behavior, but since it will complete as soon as any input completes (which in this case will always be the input, thanks to the cycle) merge completes and cancels demand on extractSubtree (which by itself could (depending on whether the Broadcast has eagerCancel set) cause the downstream to cancel), which will likely result in at least some elements emitted by extractSubtree not getting processed.

    If you're absolutely sure that the input completing means that the cycle will eventually dry up, you can use eagerComplete = false if you have some means to complete extractSubtree once the cycle is dry and the input has completed. A broad outline (without knowing what, specifically, is in extractSubtree) for going about this:

    • map everything coming into extractSubtree from bcast into a Some of the input
    • prematerialize a Source.actorRef to which you can send a None, save the ActorRef (which will be the materialized value of this source)
    • merge the input with that prematerialized source
    • when extracting the subtree, use a statefulMapConcat stage to track whether a) a None has been seen and b) how many subtrees are pending (initial value 1, add the number of (first generation) children of this node minus 1, i.e. no children subtracts 1); if a None has been seen and no subtrees are pending emit a List(None), otherwise emit a List of each subtree wrapped in a Some
    • have a takeWhile(_.isDefined), which will complete once it sees a None
    • if you have more complex things (e.g. side effects) in extractSubtrees, you'll have to figure out where to put them
    • before merging the outside input, pass it through a watchTermination stage, and in the future callback (on success) send a None to the ActorRef you got when prematerializing the Source.actorRef for extractSubtrees. Thus, when the input completes, watchTermination will fire successfully and effectively send a message to extractSubtrees to watch for when it's completed the inflight tree.