Search code examples
scalaakkaakka-stream

Akka Stream - Splitting flow into multiple Sources


I have a TCP connection in Akka Stream that ends in a Sink. Right now all messages go into one Sink. I want to split the stream into an unknown number of Sinks given some function.

The use case is as follows, from the TCP connection I get en continuous stream of something like List[DeltaValue], now I want to create an actorSink for each DeltaValue.id so that i can continuously accumulate and implement behaviour for each DeltaValue.id. I find this to be a standard use case in stream processing but I'm not able to find a good example with Akka Stream.

This is what I have right now:

def connect(): ActorRef = tcpConnection
    .//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
    .to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
    .run()

Update: I now have this, not sure what to say about it, it does not feel super stable but it should work:

  private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
    implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
    system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
      case Success(actorRef) => actorRef ! m
      case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
    }
  }

  def connect(): ActorRef = tcpConnection
    .to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
    .run()

Solution

  • The below should be a somewhat improved version of what was updated in the question. The main improvement is that your actors are kept in a data structure to avoid actorSelection resolution for every incoming message.

      case class DeltaValue(id: String, value: Double)
    
      val src: Source[DeltaValue, NotUsed] = ???
    
      src.runFold(Map[String, ActorRef]()){
        case (actors, elem) if actors.contains(elem.id) ⇒
          actors(elem.id) ! elem.value
          actors
        case (actors, elem) ⇒
          val newActor = system.actorOf(ReceiverActor.props(), ReceiverActor.name)
          newActor ! elem.value
          actors.updated(elem.id, newActor)
      }
    

    Keep in mind that, when you integrate Akka Streams with bare actors, you lose backpressure support. This is one of the reasons why you should try and implement your logic within the boundaries of Akka Streams whenever possible. And this is not always possible - e.g. when remoting is needed etc.

    In your case, you could consider leveraging groupBy and the concept of substream. The example below is folding the elements of each substream by summing them, just to give an idea:

      src.groupBy(maxSubstreams = Int.MaxValue, f = _.id)
        .fold("" → 0d) {
          case ((id, acc), delta) ⇒ id → delta.value + acc
        }
        .mergeSubstreams
        .runForeach(println)