In the following code I turn a TCP socket into an Observable[Array[Byte]]
:
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
sock
}(
socket => Observable.from[Bytes] {
val incoming = socket.getInputStream
val buffer = new Bytes(1024)
Stream.continually {
val read = incoming.read(buffer, 0, 1024)
buffer.take(read)
}.takeWhile(_.nonEmpty)
},
socket => {
println("Socket disposed")
socket.close
s.retry // Does not work
})
.subscribeOn(IOScheduler.apply)
s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)
Connection to a remote server may be interrupted at any moment and in that case I'd like an Observable
to try to reconnect automatically but s.retry
does not do anything. How can I achieve this? Also can it be done "inside" the current Observable
without creating a new one and re-subscribing?
You want to set up a new socket connection on each new subscription. This is easiest with (A)SyncOnSubscribe
, ported to RxScala since version 0.26.5
. One you have this observable you can use normal error control methods like .retry
.
Something like this:
val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
generator = () =>
sock
.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
.getInputStream
)(next = is => Try(is.read()) match {
case Success(-1) => Notification.OnCompleted()
case Success(byte) => Notification.OnNext(byte)
case Failure(e) => Notification.OnError(e)
},
onUnsubscribe = is => Try(is.close)
)
Note: this reads a single byte at a time and isn't terribly efficient. You can improve this with ASyncOnSubscribe or having each event of your observable be an array of bytes.
Note: this is a cold observable and will create a new socket for each subscriber. For example this will open 2 sockets:
socketObservable.foreach(b => System.out.print(b))
socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte))
If this is not what you want you can turn it into a hot one with .share