I am trying to send message to actor bindet with Source.actorRef, but this part of code:
println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
println(s"Mapping $ref")
ref ! letter.text
}
println(s"After mapping $src")
is printing only something like this:
Before mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
After mapping Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
So. Somehow mapMaterializedValue do not doing anything. For sure no message to actor sent. Is ref - None for some reason?
Further, I post all the code. It is plot of something like simple messenger (one to one messages) on websockets. I am for now just studing Akka streams, so this code is really not perfect. I am ready to hear any critics or advices.
Main server object:
package treplol.server
import treplol.common._
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import scala.io.StdIn
import java.util.UUID
object WsServer extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()
def createSource(uuid: UUID): Source[String, ActorRef] = {
val src = Source.actorRef[String](0, OverflowStrategy.fail)
sources(uuid) = src
src
}
val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] =
collection.mutable.HashMap[UUID, Source[String, ActorRef]]()
val userSources: collection.mutable.HashMap[String, UUID] =
collection.mutable.HashMap[String, UUID]()
def flow: Flow[Message, Message, Any] = {
val uuid: UUID = UUID.randomUUID()
val incomingSource: Source[String, ActorRef] = createSource(uuid)
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
val mapMsgToLttr = b.add(
Flow[Message].collect { case TextMessage.Strict(txt) => txt }
.map[Letter] { txt =>
WsSerializer.decode(txt) match {
case Auth(from) =>
userSources(from) = uuid
Letter("0", from, "Authorized!")
case ltr: Letter => ltr
}
}
)
val processLttr = b.add(
Flow[Letter].map[String] { letter =>
userSources.get(letter.to) flatMap sources.get match {
case Some(src) =>
println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
println(s"Mapping $ref")
ref ! letter.text
}
println(s"After mapping $src")
""
case None => "Not authorized!"
}
}
)
val mapStrToMsg = b.add(
Flow[String].map[TextMessage] (str => TextMessage.Strict(str))
)
mapMsgToLttr ~> processLttr ~> merge
incomingSource ~> merge ~> mapStrToMsg
FlowShape(mapMsgToLttr.in, mapStrToMsg.out)
})
}
val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
Common package:
package treplol
package object common {
trait WsMessage
case class Letter(from: String, to: String, text: String) extends WsMessage
case class Auth(from: String) extends WsMessage
object WsSerializer {
import org.json4s.{Extraction, _}
import org.json4s.jackson.JsonMethods.{compact, parse}
import org.json4s.jackson.Serialization
implicit val formats = {
Serialization.formats(NoTypeHints)
}
case class WsData(typeOf: String, data: String)
object WsDataType {
val LETTER = "letter"
val AUTH = "auth"
}
class WrongIncomingData extends Throwable
def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match {
case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter]
case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth]
case _ => throw new WrongIncomingData
}
def encode(wsMessage: WsMessage): String = {
val typeOf = wsMessage match {
case _: Letter => WsDataType.LETTER
case _: Auth => WsDataType.AUTH
case _ => throw new WrongIncomingData
}
compact(Extraction.decompose(
WsData(typeOf, compact(Extraction.decompose(wsMessage)))
))
}
}
}
build.sbt
name := "treplol"
version := "0.0"
scalaVersion := "2.12.1"
resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.16",
"com.typesafe.akka" %% "akka-stream" % "2.4.16",
"com.typesafe.akka" %% "akka-http" % "10.0.3",
"org.json4s" %% "json4s-jackson" % "3.5.0"
)
Thank you all in advance!
As per the docs, the mapMaterializedValue
combinator
Transform only the materialized value of this Source, leaving all other properties as they were.
The materialized value is only available after any graph stage (in this case, the source) is run. You are never running your source in your code.
Note that the Flow[Message, Message, Any]
you use to handle WebSocket messages is actually run by Akka-HTTP infrastructure, so you don't need to do it manually. However, the Source
you create in the body of processLttr
are not attached to the rest of the graph, and are therefore not run.
Refer to the docs for more info on running graphs and materialization.