I am learning RxJava and created code
Using this class to create a list of random Ints
public class Sources {
public List<Integer> randomIntegerList() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
return IntStream
.range(0, 99)
.boxed()
.collect(Collectors.toCollection(ArrayList::new));
}
}
Some random functions
public class Methods {
public boolean isEven(Integer integer) {
if (integer % 2 == 0) {
return true;
}
return false;
}
}
Problem is here
public class DemoApplication {
public static void main(String[] args) {
new DemoApplication().executeSingle();
}
private void executeSingle() {
Single.create(emitter -> {
List<Integer> list = null;
try {
list = new Sources()
.randomIntegerList();
} catch (InterruptedException e) {
emitter.onError(e);
}
emitter.onSuccess(list);
})
.subscribe(
x -> {printData((List<Integer>) x);}
);
System.out.println("Finished");
}
private void printData(List<Integer> list) {
list .stream()
.filter(y -> new Methods().isEven(y))
.forEach(System.out::println);
}
}
Finished is printed after a delay of 4 seconds.. basically after subscribe executes its functionality, how can I make it non-blocking. what I want is that thread keep executing next lines and when printData() function is called that thread stop executing and execute printData() first.
By default, an Observable executes on the same thread that declared the Observer and subscribed it. I have modified your example slightly (using rxJava 3) -
Observable<Integer> randIntegerObservable =
Observable.create(
emitter -> {
List<Integer> integers = new Sources().randomIntegerList();
for (Integer integer : integers) {
emitter.onNext(integer);
}
});
randIntegerObservable
.filter(i -> i % 2 == 0)
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " - " + i.toString()));
If you run above code you'll see output like
main - 0
main - 2
main - 4
main - 6
main - 8 ...
You can use subscribeOn()
operator, which instructs the source to fire emissions on a specified Scheduler (separate thread)
Code after adding subscribeOn()
-
Observable<Integer> randIntegerObservable =
Observable.create(
emitter -> {
List<Integer> integers = new Sources().randomIntegerList();
for (Integer integer : integers) {
emitter.onNext(integer);
}
});
randIntegerObservable
.filter(i -> i % 2 == 0)
.subscribeOn(Schedulers.io())
.subscribe(
i -> System.out.println(Thread.currentThread().getName() + " - " + i.toString())
);
TimeUnit.SECONDS.sleep(6);
And output(don't forget to add sleep in the end else main thread will exit before output is printed) -
RxCachedThreadScheduler-1 - 0
RxCachedThreadScheduler-1 - 2
RxCachedThreadScheduler-1 - 4
RxCachedThreadScheduler-1 - 6
RxCachedThreadScheduler-1 - 8 ..
Check ReactiveX documentation for various Schedulers available. You can also use observeOn()
instead of subscribeOn()
. There is subtle difference between subscribeOn()
and observeOn(), as
observeOn() can be used to switch different scheduler downstream. You can find more detail in this stackoverflow answer