Search code examples
scalaakka-stream

Trigger signal in akka streams


This code snippet in akka-stream cookbook documentation illustrates how to trigger the flow of elements programmatically :

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val zip = builder.add(Zip[Message, Trigger]())
  elements ~> zip.in0
  triggerSource ~> zip.in1
  zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
  ClosedShape
})

In this example, how Trigger and triggerSource would look like ?


Solution

  • 1.Trigger can be anything, as you can see from the graph logic, it's getting discarded all the time. Most likely it will be

    case object Trigger
    

    2.triggerSource is any valid Source[Trigger, _]. See the docs for a list of possibilities.

    Note that this specific example involves a ClosedShape materializing to NotUsed, so it will need tweaking if you need to access triggerSource materialized value, or connecting this to a more complex graph producing Triggers.