Search code examples
javaandroidmultithreadingrx-java

Android RxJava process http response rows in separate threads


I'm studying RxJava to see if I can use it to replace the deprecated AsynTasks in an application created several years ago.

my use case is as follows:

  1. make an http request on Schedulers.io that returns some rows
  2. process the rows separately, in parallel threads
  3. update the UI on main thread only when all rows have been processed

is there a way to do step 2 easily in rx java?

Below is a code example.

Thanks

Observable.fromCallable(()-> {

    // 1- get rows form server
    ArrayList<HashMap<String, Object>> rows = new ArrayList<HashMap<String, Object>>();

    // 2- process rows 
    for (HashMap row : rows) {
        //manipulate row
        row.put("test", "test");  <-- code that I want to parallelize
    }

    return rows;
})
        .subscribeOn(Schedulers.io())// Execute in IO thread, i.e. background thread.
        .observeOn(AndroidSchedulers.mainThread())// report or post the result to main thread.
        .subscribeWith(new Observer<ArrayList<HashMap<String, Object>>>() {

            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull ArrayList<HashMap<String, Object>> hashMaps) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
                //3- update UI....
            }
        }); 

Solution

  • After several attempts I came to this solution.

    By adding logs to the processRow function I saw that it is called in parallel for multiple rows, as always, at the end the onComplete is called.

    Observable.fromCallable(() -> getListResponse()) // 1- get rows form server
                    .subscribeOn(Schedulers.io())
                    .flatMapIterable(rowItem -> rowItem) 
                    .flatMap(val -> Observable.just(val) //paralelize
                            .subscribeOn(Schedulers.computation())
                            .map(i -> processRow(i) )) // 2- process rows in parallel threads
                    .observeOn(AndroidSchedulers.mainThread()) 
                    .subscribe(new Observer<Object>() {
    
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            
                        }
    
                        @Override
                        public void onNext(@NonNull Object listResponse) { }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            
                            e.printStackTrace();
                            
                        }
    
                        @Override
                        public void onComplete() {
                           //3- update UI....
                        }
                    });