Search code examples
scalaplayframeworkakkaakka-stream

Akka Streams and Scala Play server


I have a server written in scala play 2.6

I am trying to have the websocket

  1. Recieve a request from a client
  2. Process that request
  3. Broadcast the result to all clients if Right broadcast the error only to the client who sent the request if Left

I have the messages broadcasting to all the clients right now, does anyone know how to only respond back to the sender in the error case?

  val processFlow = Flow[String].map(process).map(_.toString)

  val (sink, source) = { 
    MergeHub.source[String]
      .via(processFlow)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()
  }

  val websocketFlow = Flow.fromSinkAndSource(sink, source)

  def ws = WebSocket.accept[String, String] { request =>  
    websocketFlow
  }

  def process(message: String): Either[String, String] = { 
    if (message == "error") { // replace with any error condition
      Left ("ERROR " ++ message)
    } else {
      Right (message ++ " processed")
    }   
  }

Solution

  • If you trace the sender in your flow, you can then filter the received message before sending them on the websocket:

    case class ProcessResult(senderId: String, result: Either[String, String])
    
    val (sink, source) = { 
      MergeHub.source[ProcessResult]
        .toMat(BroadcastHub.sink[ProcessResult])(Keep.both)
        .run()
    }
    val websocketFlow = Flow.fromSinkAndSource(sink, source)
    
    def ws = WebSocket.accept[String, String] { request =>
      // create a random id to identify the sender
      val senderId = UUID.randomUUID().toString
      Flow[String]
        .map(process)
        .map(result => ProcessResult(senderId, result))
        // broadcast the result to the other websockets
        .via(websocketFlow)
        // filter the results to only keep the errors for the sender
        .collect {
          case ProcessResult(sender, Left(error)) if sender == senderId => List(error)
          case ProcessResult(_, Left(error)) => List.empty
          case ProcessResult(_, Right(result)) => List(result)
        }.mapConcat(identity)
    }
    
    def process(message: String): Either[String, String] = { 
      if (message == "error") { // replace with any error condition
        Left ("ERROR " ++ message)
      } else {
        Right (message ++ " processed")
      }   
    }