I'm building an application where I take a request from a user, call a REST API to get back some data, then based on that response, make another HTTP call and so on. Basically, I'm processing a tree of data where each node in the tree requires me to recursively call this API, like this:
A
/ \
B C
/ \ \
D E F
I'm using Akka HTTP with Akka Streams to build the application, so I'm using it's streaming API, like this:
val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Data](2))
val bcast = b.add(Broadcast[ResponseData](2))
takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
merge <~ extractSubtree <~ bcast
FlowShape(takeUserData.in, bcast.out(1))
}
I understand that the best way to handle recursion in an Akka Streams application is to handle recursion outside of the stream, but since I'm recursively calling the HTTP flow to get each subtree of data, I wanted to make sure that the flow was properly backpressured in case the API becomes overloaded.
The problem is that this stream never completes. If I hook it up to a simple source like this:
val source = Source.single(data)
val sink = Sink.seq[ResponseData]
source.via(flow).runWith(sink)
It prints out that it's processing all the data in the tree and then stops printing anything, just idling forever.
I read the documentation about cycles and the suggestion was to put a MergePreferred
in there, but that didn't seem to help. This question helped, but I don't understand why MergePreferred
wouldn't stop the deadlock, since unlike their example, the elements are removed from the stream at each level of the tree.
Why doesn't MergePreferred
avoid the deadlock, and is there another way of doing this?
MergePreferred
(in the absence of eagerComplete
being true) will complete when all the inputs have completed, which tends to generally be true of stages in Akka Streams (completion flows down from the start).
So that implies that the merge can't propagate completion until both the input and extractSubtree
signal completion. extractSubtree
won't signal completion (most likely, without knowing the stages in that flow) until bcast
signals completion which (again most likely) won't happen until processResponse
signals completion which* won't happen until httpFlow
signals completion which* won't happen until createRequest
signals completion, which* won't happen until merge
signals completion. Because detecting this cycle in general is impossible (consider that there are stages for which completion is entirely dynamic), Akka Streams effectively takes the position that if you want to create a cycle like this, it's on you to determine how to break the cycle.
As you've noticed, eagerComplete
being true changes this behavior, but since it will complete as soon as any input completes (which in this case will always be the input, thanks to the cycle) merge
completes and cancels demand on extractSubtree
(which by itself could (depending on whether the Broadcast
has eagerCancel
set) cause the downstream to cancel), which will likely result in at least some elements emitted by extractSubtree
not getting processed.
If you're absolutely sure that the input completing means that the cycle will eventually dry up, you can use eagerComplete = false
if you have some means to complete extractSubtree
once the cycle is dry and the input has completed. A broad outline (without knowing what, specifically, is in extractSubtree
) for going about this:
extractSubtree
from bcast
into a Some
of the inputSource.actorRef
to which you can send a None
, save the ActorRef
(which will be the materialized value of this source)statefulMapConcat
stage to track whether a) a None
has been seen and b) how many subtrees are pending (initial value 1, add the number of (first generation) children of this node minus 1, i.e. no children subtracts 1); if a None
has been seen and no subtrees are pending emit a List(None)
, otherwise emit a List
of each subtree wrapped in a Some
takeWhile(_.isDefined)
, which will complete once it sees a None
extractSubtrees
, you'll have to figure out where to put themwatchTermination
stage, and in the future callback (on success) send a None
to the ActorRef
you got when prematerializing the Source.actorRef
for extractSubtrees
. Thus, when the input completes, watchTermination
will fire successfully and effectively send a message to extractSubtrees
to watch for when it's completed the inflight tree.