I want methods in my class to run some code on an IO thread, but only once a Subject they subscribe to has a certain value. Then the caller should get a response on the Android UI thread.
Something like this:
public class MyClass {
private final Subject<Boolean, Boolean> subject;
private final OtherClass otherObject;
public MyClass(Subject<Boolean, Boolean> subject,
OtherClass otherObject) {
this.subject = subject;
this.otherObject = otherObject;
public Observable<String> myMethod() {
return waitForTrue(() -> otherObject.readFromDisk());
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.first(value -> value)
.flatMap(value -> Observable.fromCallable(callable))
Does this work? Not sure, so I wrote up a set of unit tests to check them. I found that my test methods, though they always worked when run one by one, would fail as part of a suite.
In fact, I found if I put the very same test twice, it would pass the first time, but fail the second!
public class MyClassTest {
private TestScheduler ioScheduler;
private TestScheduler androidScheduler;
private TestSubscriber<String> testSubscriber;
private MyClass objectUnderTest;
@Before public void setup() {
ioScheduler = new TestScheduler();
androidScheduler = new TestScheduler();
testSubscriber = new TestSubscriber<>();
RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
new RxAndroidSchedulersHook() {
@Override public Scheduler getMainThreadScheduler() {
return androidScheduler;
Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
objectUnderTest = new MyClass(subject, otherClass);
@Test public void firstTest() {
ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
// This passes
@Test public void secondTest() {
// This fails!
Why is this happening? And is the bug in the class under test, or the test code?
I thought it might be an issue with using RxJava 1.x, but I had a similar issue with RxJava 2.x.
EDIT: The tests failed because of a missing line in the test code. You have to put this in the setup method:
because the hook is only ever called once, by the static initializer of the AndroidSchedulers
has no practical effect on a Subject
because they don't have a subscription side-effect to move off to another thread. Therefore, when they get a new item, they notify their consumers on the caller thread. Moving an item to another thread should be done via observeOn
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.filter(value -> value)
.map(value -> callable.call())
Also you don't need flatMap
just to execute the callable
, map is enough.