Search code examples
rx-scala

How to use custom Scheduler in RxScala?


I attempt with

val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)

output a error

Error:(103, 43) type mismatch;
found   : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
                                      ^ 

how to use custom Scheduler in RxScala?


Solution

  • The problem here is that you mix RxJava and RxScala code here. You see, RxScala is just a wrapper around the RxJava functionality; the former only forwards to the latter and does not have any 'real' implementations. This is useful as you only need to maintain 1 version rather than 2 or more.

    The type of scheduler in your example is rx.Scheduler, so it is an RxJava Scheduler. However, subscribeOn requires you to provide an rx.lang.scala.Scheduler, which is an RxScala Scheduler. Hence you need to either convert the RxJava Scheduler to one from RxScala.

    However, for your case there is a better way of doing things: wrap your Executors.newSingleThreadExecutor into a scala.concurrent.ExecutionContext using the fromExecutor factory method. Then wrap this into an rx.lang.scala.schedulers.ExecutionContextScheduler and you have the scheduler you can use in subscribeOn. Your code will look something like this (I included a print statement to see on which thread the stuff is running):

    val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
    val s = ExecutionContextScheduler(e)
    
    Observable.just(1, 2, 3)
        .subscribeOn(s)
        .doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
        .subscribe()