Search code examples
scalaakkaakka-stream

How can I use acknowledgment semantics in a Flow?


I'd like to design a flow which will only pull from upstream when an ActorRef receives some acknowledgment message.

My use case is to use some static clustered actors to do work. I have a guardian which maintains whether or not there are available workers. When an actor becomes available, it will publish a message. I would like my Flow to be aware of this message to pull a job and push it downstream. Otherwise, it will backpressure.

When reading the docs, the GraphStage API doesn't seem to have some reactive component, relying on InHandlers for custom processing:

setHandler(in, new InHandler {
  override def onPush(): Unit = {
    println(grab(in))
    pull(in)
  }
})

I could potentially poll in this method for an update on the number of available actors, but this is not reactive.

Does Akka support acknowledgment in Flows by default?


Solution

  • Push Not Pull

    Instead of having the stream signal demand upstream once an Actor is free you could have the Source send data instead.

    You can create a Source that will materialize into an ActorRef which will be notified of a free Actor:

    object ActorIsFreeMessage
    
    val source : Source[ActorIsFreeMessage, ActorRef] = Source.actorRef(???, ???)
    

    You can then attach a Flow which will "pull a job" once the Source receives the message:

    type Job = ???
    
    val pullAJob : () => Job = ???
    
    val jobFlow : Flow[ActorIsFreeMessage, Job] = 
      Flow[ActorIsFreeMessage].map[Job](_ => pullAJob())
    
    val jobSource : Source[Job, ActorRef] = source via jobFlow
    

    Once this new Source is attached to a Sink the other Actors can send messages to the materialized ActorRef:

    val jobSink : Sink[Job, ActorRef] = ???
    
    val streamRef = jobSource.to(jobSink).run()
    
    //inside of the Actor
    streamRef ! ActorIsFreeMessage