Search code examples
scalarx-scala

Restart Observable connected to a resource


In the following code I turn a TCP socket into an Observable[Array[Byte]]:

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler

val sock = new Socket
type Bytes = Array[Byte]

lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
  sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
  sock
}(
  socket => Observable.from[Bytes] {

    val incoming = socket.getInputStream
    val buffer = new Bytes(1024)

    Stream.continually {
      val read = incoming.read(buffer, 0, 1024)
      buffer.take(read)
    }.takeWhile(_.nonEmpty)

  },

  socket => {
    println("Socket disposed")
    socket.close
    s.retry // Does not work
  })
  .subscribeOn(IOScheduler.apply)

s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)

Connection to a remote server may be interrupted at any moment and in that case I'd like an Observable to try to reconnect automatically but s.retry does not do anything. How can I achieve this? Also can it be done "inside" the current Observable without creating a new one and re-subscribing?


Solution

  • You want to set up a new socket connection on each new subscription. This is easiest with (A)SyncOnSubscribe, ported to RxScala since version 0.26.5. One you have this observable you can use normal error control methods like .retry.

    Something like this:

    val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
      generator = () =>
        sock
          .connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
          .getInputStream
    )(next = is => Try(is.read()) match {
        case Success(-1) => Notification.OnCompleted()
        case Success(byte) => Notification.OnNext(byte)
        case Failure(e) => Notification.OnError(e)
      },
      onUnsubscribe = is => Try(is.close)
    )
    

    Note: this reads a single byte at a time and isn't terribly efficient. You can improve this with ASyncOnSubscribe or having each event of your observable be an array of bytes.

    Note: this is a cold observable and will create a new socket for each subscriber. For example this will open 2 sockets:

    socketObservable.foreach(b => System.out.print(b))
    socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte))
    

    If this is not what you want you can turn it into a hot one with .share