Search code examples
javatimeoutrx-javareactive-programming

Timeout in RxJava


I'm new to RxJava, and I need to use the Observable feature in an asynchronous way.

I also need to use timeouts : in my exemple, I want every process to end in 1 second or less.

Here is what I've done for now :

public static void hello(String name) throws IOException {
Observable<String> obs2 = Observable.just(name).timeout(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io());
    obs2.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            if("CCCCC".equals(s)){
                try {
                    Thread.sleep(3200);
                } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } 
        System.out.println(s + " " + new Date() +" "+Thread.currentThread().getName());
        }
    });
}

public static void main(final String[] args) throws InterruptedException, IOException {     
    hello("AAAAA");
    hello("CCCCC");
    hello("BBBBBB");
    System.in.read();
}

Result :

AAAAA Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-4
CCCCC Thu Oct 05 09:43:49 CEST 2017 RxIoScheduler-3

I was actually expecting to get a TimeoutException from the thread named "RxIoScheduler-3" since it has been sleeping for 3 seconds.

What's wrong with my code and my approach of timeouts in RxJava?

Thank you for helping me.


Solution

  • According to the docs the timeout operator will:

    mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items

    So, a timeout is deemed to have occurred if there is a delay in emitting events but you have put a delay in consuming events and that will not cause a timeout.

    If you rework your code to pause during emission then a timeout will occur. For example:

    public static void hello(String name) throws IOException {
        Observable<String> obs2 = Observable.fromCallable(() -> {
                    if ("CCCCC".equals(name)) {
                        // pause for 150ms before emitting "CCCCC"
                        try {
                            Thread.sleep(150);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return name;
                }
        ).timeout(100, MILLISECONDS) // timeout if there is a pause in emission of more than 100ms
                .subscribeOn(Schedulers.io());
    
        obs2.subscribe(s -> System.out.println(s + " " + new Date() + " " + Thread.currentThread().getName()),
                throwable -> System.err.println(throwable.getClass().getSimpleName() + " " + new Date() + " " + Thread.currentThread().getName()));
    }
    

    Using the above form of hello() you'll get the following output written to console:

    AAAAA Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-2
    BBBBBB Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-4
    TimeoutException Thu Oct 05 10:10:33 IST 2017 RxComputationScheduler-1