I attempt with
val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)
output a error
Error:(103, 43) type mismatch;
found : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
^
how to use custom Scheduler in RxScala
?
The problem here is that you mix RxJava and RxScala code here. You see, RxScala is just a wrapper around the RxJava functionality; the former only forwards to the latter and does not have any 'real' implementations. This is useful as you only need to maintain 1 version rather than 2 or more.
The type of scheduler
in your example is rx.Scheduler
, so it is an RxJava Scheduler
. However, subscribeOn
requires you to provide an rx.lang.scala.Scheduler
, which is an RxScala Scheduler
. Hence you need to either convert the RxJava Scheduler
to one from RxScala.
However, for your case there is a better way of doing things: wrap your Executors.newSingleThreadExecutor
into a scala.concurrent.ExecutionContext
using the fromExecutor
factory method. Then wrap this into an rx.lang.scala.schedulers.ExecutionContextScheduler
and you have the scheduler you can use in subscribeOn
. Your code will look something like this (I included a print statement to see on which thread the stuff is running):
val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)
Observable.just(1, 2, 3)
.subscribeOn(s)
.doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
.subscribe()