Search code examples
multithreadingrx-java2scheduling

Thread pipelining with RxJava


RxJava gurus, here is your chance to shine!

Can you ensure the following program does not throw an IllegalStateException by only changing the RxJava pipeline starting with Flowable.generate() in the main() method?

class ExportJob {
    private static Scheduler singleThread(String threadName) {
        return Schedulers.from(newFixedThreadPool(1, r -> {
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
        }));
    }

    public static void main(String[] args) {
        Scheduler genSched = singleThread("genThread");
        Scheduler mapSched = singleThread("mapThread");
        // execute on "genThread"
        Flowable.generate(ExportJob::infiniteGenerator)
                .subscribeOn(genSched, false)
                // execute on "mapThread"
                .observeOn(mapSched, false)
                .concatMapMaybe(ExportJob::mapping)
                // execute on the thread that creates the pipeline, block it until finished
                .blockingForEach(ExportJob::terminal);
    }

    private static int nb;
    /** Must execute on "genThread" thread. */
    private static void infiniteGenerator(Emitter<Integer> emitter) {
        print(nb, "infiniteGenerator");
        emitter.onNext(nb++);
        checkCurrentThread("genThread");
    }

    /** Must execute on "mapThread" thread. */
    private static Maybe<Integer> mapping(Integer s) {
        print(s, "mapping");
        checkCurrentThread("mapThread");
        return Maybe.just(s);
    }

    /** Must execute on "terminal" thread. */
    private static void terminal(Integer s) {
        print(s, "terminal");
        checkCurrentThread("main");
    }

    private static void print(int item, String method) {
        System.out.format("%d - %s - %s()%n", item, Thread.currentThread().getName(), method);
    }

    private static void checkCurrentThread(String expectedThreadName) throws IllegalStateException {
        String name = Thread.currentThread().getName();
        if (!name.equals(expectedThreadName)) {
            throw new IllegalStateException("Thread changed from '" + expectedThreadName + "' to '" + name + "'");
        }
    }
}

Solution

  • You have to use subscribeOn(scheduler, true) so the requests are routed back to their expected threads as well:

    Flowable.generate(ExportJob::infiniteGenerator)
                .subscribeOn(genSched, true)  // <------------------------------
                // execute on "mapThread"
                .observeOn(mapSched, false)
                .concatMapMaybe(ExportJob::mapping)
                .subscribeOn(mapSched, true)  // <------------------------------
                .blockingForEach(ExportJob::terminal);