Search code examples
javastreamrx-java

How to make an asynchronous producer/source in Java?


I have a list of tasks that needs to be processed by a method, the process takes a long time, so I'd like the tasks in the list to be processed one by one in asynchronous mode and return the results as an asynchronous stream, so downstream processing does not need to wait the completion of the whole list of tasks:

AsyncStream<R> methodA(List<T> tasks){tasks.forEach(t -> {calculation that takes a long time})}

After a brief search on the internet, I found RxJava can handle asynchronous streaming data, but the introductions does seem to explain how to create the asynchronous data stream. So how to create a asynchronous producer/source in Java?


Solution

  • You can create asynchronous Observable that will emit values as soon as the computation for given task is finished. You will need flatMap operator for this. In a simplified example this would look like :

    static Observable<String> methodA(List<String> tasks) {
         return Observable.from(tasks)
                .flatMap(t -> Observable.just(t)
                        .map(t1 -> longRunningTask(t1))
                        .subscribeOn(Schedulers.io())
                );
    
    }
    
    static String longRunningTask(String arg) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return arg;
    }
    

    You map your tasks to Observable and use subscribeOn so that when something subscribes to them the subscription happens on different thread. flatMap operator subscribes to all of those Observables at once and emits values as soon as they are ready. The computation is async because subscription happens in different threads from Scedulers.io pool.