Search code examples
scalaakkaakka-stream

How to add elements to Source dynamically?


I have example code to generate an unbound source and working with it:

object Main {

 def main(args : Array[String]): Unit = {

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorFlowMaterializer()

  val source: Source[String] = Source(() => {
     Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
    })

  source.runForeach((item:String) => { println(item) })
  .onComplete{ _ => system.shutdown() }
 }

}

I want to create class which implements:

trait MySources {
    def addToSource(item: String)
    def getSource() : Source[String]
}

And I need use it with multiple threads, for example:

class MyThread(mySources: MySources) extends Thread {
  override def run(): Unit = {
    for(i <- 1 to 1000000) { // here will be infinite loop
        mySources.addToSource(i.toString)
    }
  }
} 

And expected full code:

object Main {
  def main(args : Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val sources = new MySourcesImplementation()

    for(i <- 1 to 100) {
      (new MyThread(sources)).start()
    }

    val source = sources.getSource()

    source.runForeach((item:String) => { println(item) })
    .onComplete{ _ => system.shutdown() }
  }
}

How to implement MySources?


Solution

  • One way to have a non-finite source is to use a special kind of actor as the source, one that mixes in the ActorPublisher trait. If you create one of those kinds of actors, and then wrap with a call to ActorPublisher.apply, you end up with a Reactive Streams Publisher instance and with that, you can use an apply from Source to generate a Source from it. After that, you just need to make sure your ActorPublisher class properly handles the Reactive Streams protocol for sending elements downstream and you are good to go. A very trivial example is as follows:

    import akka.actor._
    import akka.stream.actor._
    import akka.stream.ActorFlowMaterializer
    import akka.stream.scaladsl._
    
    object DynamicSourceExample extends App{
    
      implicit val system = ActorSystem("test")
      implicit val materializer = ActorFlowMaterializer()
    
      val actorRef = system.actorOf(Props[ActorBasedSource])
      val pub = ActorPublisher[Int](actorRef)
    
      Source(pub).
        map(_ * 2).
        runWith(Sink.foreach(println))
    
      for(i <- 1 until 20){
        actorRef ! i.toString
        Thread.sleep(1000)
      }
    
    }
    
    class ActorBasedSource extends Actor with ActorPublisher[Int]{
      import ActorPublisherMessage._
      var items:List[Int] = List.empty
    
      def receive = {
        case s:String =>
          if (totalDemand == 0) 
            items = items :+ s.toInt
          else
            onNext(s.toInt)    
    
        case Request(demand) =>  
          if (demand > items.size){
            items foreach (onNext)
            items = List.empty
          }
          else{
            val (send, keep) = items.splitAt(demand.toInt)
            items = keep
            send foreach (onNext)
          }
    
    
        case other =>
          println(s"got other $other")
      }
    
    
    }