Search code examples
scalareactive-streams

How to convert reactive Publisher to simple Stream in Scala?


Is it possible convert org.reactivestreams.Publisher instance to scala.Stream? If it is possible, how to do it?


Solution

  • Will something like the following work for you?

    val queue: java.util.concurrent.BlockingQueue[T] = ... // TODO: choose appropriate BlockingQueue implementation
    
    publisher.subscribe(new Subscriber[T] {
      override def onNext(t: T): Unit = { queue.put(t) }
    
      // TODO: implement other Subscriber methods
    }
    
    val stream = Stream.continually(queue.take)