Search code examples
androidandroid-asynctaskrx-javarx-android

How to manually call observer.onNext in rxJava


I am relatively new to RxJava/RxAndroid. I have been using AsyncTask to do my long running tasks before now. I have converted most of my AsyncTask to RxJava but this one. The particular problem I am having is calling something like AsyncTask's publishProgress(params); in the background thread. I need to do this to update the progress of a ProgressBar.

First this is the code in AsyncTask

private static class AddBooksToDatabase extends AsyncTask<String, String, String> {
    //dependencies removed

    AddBooksToDatabase(AddBooksDbParams params) {
        //Removed assignment codes
    }

    @Override
    protected String doInBackground(String... strings) {
        //Initializing custom SQLiteOpenHelper and SQLite database
        File mFile = new File(mFolderPath);

        int booksSize = getFilesInFolder(mFile).size();
        String[] sizeList = {String.valueOf(booksSize)};
        //The first publishProgress is used to set the max of the progressbar
        publishProgress(sizeList);

        for (int i = 0; i < booksSize; i++) {
            //publishProgress with current item, current file
            publishProgress(String.valueOf(i), getFilesInFolder(mFile).get(i).getName());
            //Inserting current items in database. Code removed
        }
        return null;
    }

    @Override
    protected void onPreExecute() {
        //Show ProgressBar
    }

    @Override
    protected void onPostExecute(String s) {
        //Hide ProgressBar
    }

    @Override
    protected void onProgressUpdate(String... values) {
        super.onProgressUpdate(values);
        if (values.length == 1) {
            //The first call to publishProgress
            mProgressBar.setMax(Integer.parseInt(values[0]));
        } else {
            //Subsequent calls to publish progress
            Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
            infoText.setText(values[1]);
            mProgressBar.setProgress(Integer.parseInt(values[0]), true);

        }
    }

    @Override
    protected void onCancelled() {
        cancel(true);
    }
}

Code Using RxJava

final Observable<String[]> addBooksObserver = Observable.create(new Observable.OnSubscribe<String[]>() {
        @Override
        public void call(Subscriber<? super String[]> subscriber) {
            subscriber.onNext(setAddSubscription());
            subscriber.onCompleted();
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

private String[] setAddSubscription() {
    S//Initializing custom SQLiteOpenHelper and SQLite database
    File mFile = new File(mFolderPath);

    int booksSize = getFilesInFolder(mFile).size();
    String[] sizeList = {String.valueOf(booksSize)};
    //The first publishProgress is used to set the max of the progressbar
    addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length), null, null));

    for (int i = 0; i < booksSize; i++) {
        EpubReader reader = new EpubReader();
        //publishProgress with current item, current file*
        addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length),
                String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
        //Inserting current item in database. Code removed
    }
    return null;
}

private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
    return new String[]{totalItems, currentItem, currentFile};
}

The problem is that lines addBooksObserver.doOnNext(addReturnParams( are displaying this error doOnNext (rx.functions.Action1) cannot be applied to (java.lang.String[])

I don't know have any idea how to fix this because I thought that since setAddSubscription() and addReturnParams(String totalItems, String currentItem, String currentFile) are returning String array then this shouldn't be a problem. Please can you help me out?


Solution

  • you just have to pass the values to the onNext method of your subscriber, not the doOnNext method of your observable!

    you also have to subscribe to the service. try something like this for your obserable:

    Observable.create(new Observable.OnSubscribe<String[]>() {
      @Override
      public void call(Subscriber<? super String[]> subscriber) {
        setAddSubscription(subscriber);
        subscriber.onCompleted();
      }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<String[]>() {
      @Override
      public void onCompleted() {
       // handle 'oparation is done'
      }
    
      @Override
      public void onError(Throwable e) {
    
      }
    
      @Override
      public void onNext(String[] values) {
        if (values.length == 1) {
            //The first call to publishProgress
            mProgressBar.setMax(Integer.parseInt(values[0]));
        } else {
            //Subsequent calls to publish progress
            Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
            infoText.setText(values[1]);
            mProgressBar.setProgress(Integer.parseInt(values[0]), true);
    
        }
      }
    });
    

    you also need to modify your private methods a little bit:

    private void setAddSubscription(Subscriber<? super String[]> subscriber) {
      //Initializing custom SQLiteOpenHelper and SQLite database
      File mFile = new File(mFolderPath);
    
      int booksSize = getFilesInFolder(mFile).size();
      String[] sizeList = {String.valueOf(booksSize)};
      //The first publishProgress is used to set the max of the progressbar
      subscriber.onNext(addReturnParams(String.valueOf(sizeList.length), null, null));
    
      for (int i = 0; i < booksSize; i++) {
        EpubReader reader = new EpubReader();
        //publishProgress with current item, current file*
        subscriber.onNext(addReturnParams(String.valueOf(sizeList.length),
                String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
        //Inserting current item in database. Code removed
      }
    
    }
    
    private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
      return new String[]{totalItems, currentItem, currentFile};
    }