This code snippet in akka-stream cookbook documentation illustrates how to trigger the flow of elements programmatically :
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(Zip[Message, Trigger]())
elements ~> zip.in0
triggerSource ~> zip.in1
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
ClosedShape
})
In this example, how Trigger
and triggerSource
would look like ?
1.Trigger
can be anything, as you can see from the graph logic, it's getting discarded all the time. Most likely it will be
case object Trigger
2.triggerSource
is any valid Source[Trigger, _]
. See the docs for a list of possibilities.
Note that this specific example involves a ClosedShape
materializing to NotUsed
, so it will need tweaking if you need to access triggerSource
materialized value, or connecting this to a more complex graph producing Trigger
s.