I am relatively new to RxJava/RxAndroid. I have been using AsyncTask to do my long running tasks before now.
I have converted most of my AsyncTask to RxJava but this one.
The particular problem I am having is calling something like AsyncTask's publishProgress(params);
the background thread. I need to do this to update the progress of a ProgressBar
First this is the code in AsyncTask
private static class AddBooksToDatabase extends AsyncTask<String, String, String> {
//dependencies removed
AddBooksToDatabase(AddBooksDbParams params) {
//Removed assignment codes
protected String doInBackground(String... strings) {
//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
for (int i = 0; i < booksSize; i++) {
//publishProgress with current item, current file
publishProgress(String.valueOf(i), getFilesInFolder(mFile).get(i).getName());
//Inserting current items in database. Code removed
return null;
protected void onPreExecute() {
//Show ProgressBar
protected void onPostExecute(String s) {
//Hide ProgressBar
protected void onProgressUpdate(String... values) {
if (values.length == 1) {
//The first call to publishProgress
} else {
//Subsequent calls to publish progress
Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
mProgressBar.setProgress(Integer.parseInt(values[0]), true);
protected void onCancelled() {
Code Using RxJava
final Observable<String[]> addBooksObserver = Observable.create(new Observable.OnSubscribe<String[]>() {
public void call(Subscriber<? super String[]> subscriber) {
private String[] setAddSubscription() {
S//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length), null, null));
for (int i = 0; i < booksSize; i++) {
EpubReader reader = new EpubReader();
//publishProgress with current item, current file*
String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
//Inserting current item in database. Code removed
return null;
private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
return new String[]{totalItems, currentItem, currentFile};
The problem is that lines addBooksObserver.doOnNext(addReturnParams(
are displaying this error
doOnNext (rx.functions.Action1) cannot be applied to (java.lang.String[])
I don't know have any idea how to fix this because I thought that since setAddSubscription()
and addReturnParams(String totalItems, String currentItem, String currentFile)
are returning String array then this shouldn't be a problem. Please can you help me out?
you just have to pass the values to the onNext
method of your subscriber
, not the doOnNext
method of your observable!
you also have to subscribe to the service. try something like this for your obserable:
Observable.create(new Observable.OnSubscribe<String[]>() {
public void call(Subscriber<? super String[]> subscriber) {
.subscribe(new Subscriber<String[]>() {
public void onCompleted() {
// handle 'oparation is done'
public void onError(Throwable e) {
public void onNext(String[] values) {
if (values.length == 1) {
//The first call to publishProgress
} else {
//Subsequent calls to publish progress
Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
mProgressBar.setProgress(Integer.parseInt(values[0]), true);
you also need to modify your private methods a little bit:
private void setAddSubscription(Subscriber<? super String[]> subscriber) {
//Initializing custom SQLiteOpenHelper and SQLite database
File mFile = new File(mFolderPath);
int booksSize = getFilesInFolder(mFile).size();
String[] sizeList = {String.valueOf(booksSize)};
//The first publishProgress is used to set the max of the progressbar
subscriber.onNext(addReturnParams(String.valueOf(sizeList.length), null, null));
for (int i = 0; i < booksSize; i++) {
EpubReader reader = new EpubReader();
//publishProgress with current item, current file*
String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
//Inserting current item in database. Code removed
private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
return new String[]{totalItems, currentItem, currentFile};