I'd like to design a flow which will only pull from upstream when an ActorRef
receives some acknowledgment message.
My use case is to use some static clustered actors to do work. I have a guardian which maintains whether or not there are available workers. When an actor becomes available, it will publish a message. I would like my Flow to be aware of this message to pull a job and push it downstream. Otherwise, it will backpressure.
When reading the docs, the GraphStage
API doesn't seem to have some reactive component, relying on InHandler
s for custom processing:
setHandler(in, new InHandler {
override def onPush(): Unit = {
println(grab(in))
pull(in)
}
})
I could potentially poll in this method for an update on the number of available actors, but this is not reactive.
Does Akka support acknowledgment in Flows by default?
Push Not Pull
Instead of having the stream signal demand upstream once an Actor
is free you could have the Source
send data instead.
You can create a Source that will materialize into an ActorRef
which will be notified of a free Actor:
object ActorIsFreeMessage
val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)
You can then attach a Flow
which will "pull a job" once the Source receives the message:
type Job = ???
val pullAJob : () => Job = ???
val jobFlow : Flow[ActorIsFreeMessage, Job] =
Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())
val jobSource : Source[Job, ActorRef] = source via jobFlow
Once this new Source is attached to a Sink the other Actors can send messages to the materialized ActorRef
:
val jobSink : Sink[Job, ActorRef] = ???
val streamRef = jobSource.to(jobSink).run()
//inside of the Actor
streamRef ! ActorIsFreeMessage