Search code examples
scalaakka-streamakka-actor

order of events when using akka streams


Reading the documentation of akka-streams, I'm not really clear on things like the order of the messages and if I can enforce it. Let me set the context of my question with a small piece of code I wrote for a chat-server.

def flowShape(user: User) = GraphDSL
  .create(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) {
    implicit builder =>
      implicit chatSource =>

      import GraphDSL.Implicits._

      val messageFromOutside = builder.add(Flow[String].map {
        case msg: String => UserTextMessage(user, msg)
        case _ => InvalidMessage
      })

      val merge = builder.add(Merge[ChatMessage](2))
      // UPDATE --> this is where the change comes
      // val merge = builder.add(Concat[ChatMessage](2))

      // val channelActorSink = Sink.actorRefWithAck(channelActor, ActorInitMessage, AckMessage, UserLeft(user))
      val channelActorSink = Sink.actorRef(channelActor, UserLeft(user))

      val actorAsSource = builder.materializedValue.map { actor => UserJoined(user, actor) }

      actorAsSource ~> merge.in(0)
      messageFromOutside.out ~> merge.in(1)
      merge ~> channelActorSink

      FlowShape(messageFromOutside.in, chatSource.out)
}

To make matters simple for myself, I use this flow shape with a very simple source and sink. Something like this --

val source = Source(List[String]("hi", "hello", "what are you upto", "this is nice"))
val sink = Sink.foreach[ChatMessage] {
  case tm: UserTextMessage => println(s"${tm.user.username} :: ${tm.content}")
  case ul: UserLeft => println(s"${ul.user.username} just left the channel")
  case uj: UserJoined => println(s"${uj.user.username} just joined the channel")
  case _ => println(s"do not know what I just received")
}

val mychatchannel = new Channel(420, myactorsystem)

source.via(mychatchannel.chatFlow(User("sushruta"))).runWith(sink)

Now, here comes my concern. The order of events that is printed in the terminal is not ok at all. And I'm not sure how to fix it. Here's the output that I get --

[INFO] [11/10/2017 17:42:20.431] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.441] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/10/2017 17:42:20.443] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/10/2017 17:42:20.444] [akka-streams-akka.actor.default-dispatcher-5] [akka://akka-streams/user/channel-actor-420] sushruta sent a message

The first message hi is missing from the output. The hi message seems to have been sent before the UserJoin message was printed.

I tried fixing it (and also adding some safety around messaging) by using actorRefWithAck (which I commented out in the code above.) It gives a similar output.

[INFO] [11/11/2017 06:33:03.731] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] channel initialized and ready to take events
[INFO] [11/11/2017 06:33:03.735] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.736] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a user joined message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-4] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.737] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] sushruta sent a message
[INFO] [11/11/2017 06:33:03.738] [akka-streams-akka.actor.default-dispatcher-3] [akka://akka-streams/user/channel-actor-420] received a UserLeft message

Clearly what seems to be happening is that source is sending the messages before UserJoin message is sent. How can I fix this? Conceptually, I think I want the UserJoin message to be sent as soon as the source materializes but before it actually sends over the first message. Is that possible?

thanks


Solution

  • Think of streams as water pipes: when there is water, it will flow. The merge operator does not care from which side elements are coming. If you want to order these inputs, you need to tell that to Akka by using Concat instead.