Search code examples
scalaakkaakka-streamakka-http

How to create a Source that can receive elements later via a method call?


I would like to create a Source and later push elements on it, like in:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

What is the recommended way to do this?

Thanks!


Solution

  • There are three ways this can be achieved:

    1. Post Materialization with SourceQueue

    You can use Source.queue that materializes the Flow into a SourceQueue:

    case class Weather(zipCode : String, temperature : Double, raining : Boolean)
    
    val bufferSize = 100
    
    //if the buffer fills up then this strategy drops the oldest elements
    //upon the arrival of a new element.
    val overflowStrategy = akka.stream.OverflowStrategy.dropHead
    
    val queue = Source.queue(bufferSize, overflowStrategy)
                      .filter(!_.raining)
                      .to(Sink foreach println)
                      .run() // in order to "keep" the queue Materialized value instead of the Sink's
    
    queue offer Weather("02139", 32.0, true)
    

    2. Post Materialization with Actor

    There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:

    val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                    .filter(!_.raining)
                    .to(Sink foreach println )
                    .run() // in order to "keep" the ref Materialized value instead of the Sink's
    
    ref ! Weather("02139", 32.0, true)
    

    3. Pre Materialization with Actor

    Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here:

    object WeatherForwarder {
      def props : Props = Props[WeatherForwarder]
    }
    
    //see provided link for example definition
    class WeatherForwarder extends Actor {...}
    
    val actorRef = actorSystem actorOf WeatherForwarder.props 
    
    //note the stream has not been instatiated yet
    actorRef ! Weather("02139", 32.0, true) 
    
    //stream already has 1 Weather value to process which is sitting in the 
    //ActorRef's internal buffer
    val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}