Search code examples
javaandroidfirebase-storagerx-java2

Converting an async call to RxJava, how to manually complete after all items are processed?


I have a custom class for uploading either a single file or multiple files to Firebase Storage:

public class FirebaseUploader {
    private Context appContext;
    private FirebaseStorage storage;

    public FirebaseUploader(Context appContext) {
        this.appContext = appContext;
        this.storage = FirebaseStorage.getInstance();
    }

    // Uploading a single file
    public Observable<String> send(Uri file) {
        return toObservable(Observable.just(file));
    }

    // Uploading multiple files
    public Observable<String> send(List<Uri> files) {
        return toObservable(Observable.fromIterable(files));
    }

    private Observable<String> toObservable(Observable<Uri> observable) {
        return observable.flatMap(this::upload);
    }

    private Observable<String> upload(Uri file) {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                StorageReference branch = storage.getReference().child("images");
                StorageReference leaf = branch.child(generateUniqueId(file));
                UploadTask uploadTask = leaf.putFile(file);

                uploadTask.continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
                    @Override
                    public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
                        if (!task.isSuccessful()) emitter.onError(task.getException());
                        return leaf.getDownloadUrl();
                    }
                }).addOnCompleteListener(new OnCompleteListener<Uri>() {
                    @Override
                    public void onComplete(@NonNull Task<Uri> task) {
                        if (task.isSuccessful()) {
                            Uri downloadUri = task.getResult();
                            String url = downloadUri.toString();
                            emitter.onNext(url);
                            //emitter.onComplete();
                        } else {
                            // Handle failures
                            // ...
                        }
                    }
                });
            }
        });
    }

    private String generateUniqueId(Uri file) {
        String fileType = appContext.getContentResolver().getType(file);
        String extension = (fileType.equals("image/jpeg")) ? ".jpg" : ".png";
        String fileName = UUID.randomUUID().toString();
        return fileName + extension;
    }
}

It actually took me several days to come up with this solution because I initially had no clue how to convert the Firebase async calls to RxJava. Then I was stuck for quite a while wondering why the observable was not completing, until I eventually realized I was calling emitter.onNext and emitter.onError, but there was no emitter.onComplete anywhere in my code!

So emitter.onComplete is inside the OnCompleteListener. But it's commented out — that's not where that call should be. What I've been searching for is a way to call .onComplete() on the emitter only after all the files have been processed. With my limited experience with RxJava, the only way I'm seeing is first doing a count of the number of files to be uploaded, then manually completing the sequence when that count is reached. Isn't there a better way?


Solution

  • Your emitter.onComplete() is exactly where it should be.

    The Observable returned from your upload method is responsible for one thing only: uploading the single file specified by the Uri parameter. That task is done when the UploadTask completes, so you should call onComplete at that time.

    The flatMap operator is responsible for collecting all of these results into a single Observable. It will emit onComplete downstream when all files have been uploaded, which is exactly what you want.