Search code examples
scalawebsocketakkaakka-streamakka-actor

mapMaterializedValue doing nothing with Source.actorRef


I am trying to send message to actor bindet with Source.actorRef, but this part of code:

println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
  println(s"Mapping $ref")
  ref ! letter.text
}
println(s"After mapping $src")

is printing only something like this:

Before mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
After mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])

So. Somehow mapMaterializedValue do not doing anything. For sure no message to actor sent. Is ref - None for some reason?

Further, I post all the code. It is plot of something like simple messenger (one to one messages) on websockets. I am for now just studing Akka streams, so this code is really not perfect. I am ready to hear any critics or advices.

Main server object:

package treplol.server

import treplol.common._

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}

import scala.io.StdIn
import java.util.UUID

object WsServer extends App {

  implicit val system = ActorSystem("example")
  implicit val materializer = ActorMaterializer()

  def createSource(uuid: UUID): Source[String, ActorRef] = {
    val src = Source.actorRef[String](0, OverflowStrategy.fail)
    sources(uuid) = src
    src
  }

  val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] =
    collection.mutable.HashMap[UUID, Source[String, ActorRef]]()
  val userSources: collection.mutable.HashMap[String, UUID] =
    collection.mutable.HashMap[String, UUID]()

  def flow: Flow[Message, Message, Any] = {

    val uuid: UUID = UUID.randomUUID()
    val incomingSource: Source[String, ActorRef] = createSource(uuid)

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val merge = b.add(Merge[String](2))

      val mapMsgToLttr = b.add(
        Flow[Message].collect { case TextMessage.Strict(txt) => txt }
          .map[Letter] { txt =>
            WsSerializer.decode(txt) match {
              case Auth(from) =>
                userSources(from) = uuid
                Letter("0", from, "Authorized!")
              case ltr: Letter => ltr
            }
          }
      )

      val processLttr = b.add(
        Flow[Letter].map[String] { letter =>
          userSources.get(letter.to) flatMap sources.get match {
            case Some(src) =>
              println(s"Before mapping $src")
              src.mapMaterializedValue { ref =>
                println(s"Mapping $ref")
                ref ! letter.text
              }
              println(s"After mapping $src")
              ""
            case None => "Not authorized!"
          }
        }
      )

      val mapStrToMsg = b.add(
        Flow[String].map[TextMessage] (str => TextMessage.Strict(str))
      )

      mapMsgToLttr ~> processLttr ~> merge
                   incomingSource ~> merge ~> mapStrToMsg

      FlowShape(mapMsgToLttr.in, mapStrToMsg.out)
    })

  }

  val route = path("ws")(handleWebSocketMessages(flow))
  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher
  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ => system.terminate())
}

Common package:

package treplol

package object common {

  trait WsMessage
  case class Letter(from: String, to: String, text: String) extends WsMessage
  case class Auth(from: String) extends WsMessage

  object WsSerializer {

    import org.json4s.{Extraction, _}
    import org.json4s.jackson.JsonMethods.{compact, parse}
    import org.json4s.jackson.Serialization

    implicit val formats = {
      Serialization.formats(NoTypeHints)
    }

    case class WsData(typeOf: String, data: String)
    object WsDataType {
      val LETTER  = "letter"
      val AUTH    = "auth"
    }

    class WrongIncomingData extends Throwable

    def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match {
      case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter]
      case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth]
      case _ => throw new WrongIncomingData
    }

    def encode(wsMessage: WsMessage): String = {
      val typeOf = wsMessage match {
        case _: Letter => WsDataType.LETTER
        case _: Auth => WsDataType.AUTH
        case _ => throw new WrongIncomingData
      }
      compact(Extraction.decompose(
        WsData(typeOf, compact(Extraction.decompose(wsMessage)))
      ))
    }
  }

}

build.sbt

name := "treplol"

version := "0.0"

scalaVersion := "2.12.1"

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.16",
  "com.typesafe.akka" %% "akka-stream" % "2.4.16",
  "com.typesafe.akka" %% "akka-http" % "10.0.3",
  "org.json4s" %% "json4s-jackson" % "3.5.0"
)

Thank you all in advance!


Solution

  • As per the docs, the mapMaterializedValue combinator

    Transform only the materialized value of this Source, leaving all other properties as they were.

    The materialized value is only available after any graph stage (in this case, the source) is run. You are never running your source in your code.

    Note that the Flow[Message, Message, Any] you use to handle WebSocket messages is actually run by Akka-HTTP infrastructure, so you don't need to do it manually. However, the Source you create in the body of processLttr are not attached to the rest of the graph, and are therefore not run.

    Refer to the docs for more info on running graphs and materialization.