Search code examples
javakotlinrx-javarx-java2

Trouble with subscribe to Observable in RxJava


I have an issue where my program are not getting the result from Observable in subscribe() method.

I'm trying to build simple console app that runs a request to the server and then prints to console the result. And as I see everything works fine but I just can't get the result in subsribe(). It seems like the app finishes before the result is returned to the method.

Here is my code running the rquest:

coffeeShopApi.getCoffeeShops("")
    .subscribeOn(Schedulers.io())
    .subscribe({
         state.onNext(CoffeeShopViewState.CoffeeShopsLoaded(it))
    }, {
         it.printStackTrace()
         state.onNext(CoffeeShopViewState.Error(it.localizedMessage))
    })

After this code is executed the program just finish with exit code 0. And also it runs from main function in the main thread. What could be an issue here?


Solution

  • The problem with your code is that you subscribe on another thread as you current thread (main). Therefore, your main thread finishes before the subscribe for individual items is called.

    Here is an example code with a stacktrace:

    System.out.println(Thread.currentThread().getName() + ": start...");
    
    Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
    stream.subscribeOn(Schedulers.io())
          .subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
                     error -> error.printStackTrace());
    
    Thread.sleep(1000L);
    
    System.out.println(Thread.currentThread().getName() + ": stop...");
    

    Stacktrace:

    main: start...
    RxCachedThreadScheduler-1: a
    RxCachedThreadScheduler-1: b
    RxCachedThreadScheduler-1: c
    main: stop...
    

    It is shown that the observable is created in the main thread. The execution from the subscribe is done in a different thread RxCachedThreadScheduler-1.

    An example with execution on the same thread:

    System.out.println(Thread.currentThread().getName() + ": start...");
    
    Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
    stream.subscribe(next -> System.out.println(Thread.currentThread().getName() + ": " + next), 
                     error -> error.printStackTrace());
    
    System.out.println(Thread.currentThread().getName() + ": stop...");
    

    Stacktrace:

    main: start...
    main: a
    main: b
    main: c
    main: stop...
    

    Depending on the use case it can be necessary to wait for the stream execution to finish before the method is further processed e.g. by using the same thread or a FutureObserver. In other use cases outsourcing of computational expensive operations into a different thread is a feasible solution.


    To the question is it a daemon thread? The answer is yes. I have attached the test code:

    String message = String.format("%s-isDaemon?%s: start...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
    System.out.println(message);
    
    Observable<String> stream = Observable.fromIterable(List.of("a", "b", "c"));
    stream.subscribeOn(Schedulers.io())
          .subscribe(next -> {
                     final String innerMessage = String.format("%s-isDaemon?%s: %s", Thread.currentThread().getName(), Thread.currentThread().isDaemon(), next);
                     System.out.println(innerMessage);
                     },
                     Throwable::printStackTrace);
    
    Thread.sleep(1000L);
    
    message = String.format("%s-isDaemon?%s: stop...", Thread.currentThread().getName(), Thread.currentThread().isDaemon());
    System.out.println(message);
    

    Output of the stackrace:

    main-isDaemon?false: start...
    RxCachedThreadScheduler-1-isDaemon?true: a
    RxCachedThreadScheduler-1-isDaemon?true: b
    RxCachedThreadScheduler-1-isDaemon?true: c
    main-isDaemon?false: stop...