Search code examples
scalaplayframeworkwebsocketplayframework-2.2rx-java

How to convert RX Observable to Play Enumerator


I successfully set up a websocket in Play using its native Enumerator construct, calling some code that returns a String:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

Now I want my operation function to return an rx.lang.scala.Observable[String] instead of a String, and I want to output any String as soon as it enters. How can I map this Observable to a play.api.libs.iteratee.Enumerator?


Solution

  • You can use implicit conversion from Bryan Gilbert. This will work perfectly fine, but be careful to use the updated version of Bryan Gilbert's conversions ! Unsubscribe is never called in the answer from Jeroen Kransen (and that's bad!).

      /*
       * Observable to Enumerator
       */
      implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
        // unicast create a channel where you can push data and returns an Enumerator
        Concurrent.unicast { channel =>
          val subscription = obs.subscribe(new ChannelObserver(channel))
          val onComplete = { () => subscription.unsubscribe }
          val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
          (onComplete, onError)
        }
      }
    
      class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
        override def onNext(elem: T): Unit = channel.push(elem)
        override def onCompleted(): Unit = channel.end()
        override def onError(e: Throwable): Unit = channel.end(e)
      }
    

    To be complete, here is the conversion from Enumerator to Observable :

      /*
       * Enumerator to Observable
       */
      implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
        // creating the Observable that we return
        Observable({ observer: Observer[T] =>
          // keeping a way to unsubscribe from the observable
          var cancelled = false
    
          // enumerator input is tested with this predicate
          // once cancelled is set to true, the enumerator will stop producing data
          val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)
    
          // applying iteratee on producer, passing data to the observable
          cancellableEnum (
            Iteratee.foreach(observer.onNext(_))
          ).onComplete { // passing completion or error to the observable
            case Success(_) => observer.onCompleted()
            case Failure(e) => observer.onError(e)
          }
    
          // unsubscription will change the var to stop the enumerator above via the breakE function
          new Subscription { override def unsubscribe() = { cancelled = true } }
        })
      }
    

    Rx for WebSockets in Play

    On the other hand, you may remark that most of the time you deal with Iteratees and Enumerators in Play is when you work with WebSockets (as you do here). We all agree that Iteratees are really less intuitive that Observables and this is probably why you are using Rx in your Play project.

    From that observation, I've built a library called WidgetManager that does exactly this : integration of Rx in Play getting rid of Iteratees manipulation.

    Using that library, your code could simply be :

    def operationStatusFeed = WebSocket.using[String] { implicit request =>
    
      // you can optionally give a function to process data from the client (processClientData)
      // and a function to execute when connection is closed (onClientClose)
      val w = new WidgetManager()
    
      w.addObservable("op", operation)
    
      // subscribe to it and push data in the socket to the client (automatic JS callback called)
      w.subscribePush("op")
    
      // deals with Iteratees and Enumerators for you and returns what's needed
      w.webSocket
    }
    

    The library is on GitHub here : RxPlay (Contributions are welcome)