I have a custom class for uploading either a single file or multiple files to Firebase Storage:
public class FirebaseUploader {
private Context appContext;
private FirebaseStorage storage;
public FirebaseUploader(Context appContext) {
this.appContext = appContext;
this.storage = FirebaseStorage.getInstance();
}
// Uploading a single file
public Observable<String> send(Uri file) {
return toObservable(Observable.just(file));
}
// Uploading multiple files
public Observable<String> send(List<Uri> files) {
return toObservable(Observable.fromIterable(files));
}
private Observable<String> toObservable(Observable<Uri> observable) {
return observable.flatMap(this::upload);
}
private Observable<String> upload(Uri file) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
StorageReference branch = storage.getReference().child("images");
StorageReference leaf = branch.child(generateUniqueId(file));
UploadTask uploadTask = leaf.putFile(file);
uploadTask.continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
@Override
public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
if (!task.isSuccessful()) emitter.onError(task.getException());
return leaf.getDownloadUrl();
}
}).addOnCompleteListener(new OnCompleteListener<Uri>() {
@Override
public void onComplete(@NonNull Task<Uri> task) {
if (task.isSuccessful()) {
Uri downloadUri = task.getResult();
String url = downloadUri.toString();
emitter.onNext(url);
//emitter.onComplete();
} else {
// Handle failures
// ...
}
}
});
}
});
}
private String generateUniqueId(Uri file) {
String fileType = appContext.getContentResolver().getType(file);
String extension = (fileType.equals("image/jpeg")) ? ".jpg" : ".png";
String fileName = UUID.randomUUID().toString();
return fileName + extension;
}
}
It actually took me several days to come up with this solution because I initially had no clue how to convert the Firebase async calls to RxJava. Then I was stuck for quite a while wondering why the observable was not completing, until I eventually realized I was calling emitter.onNext
and emitter.onError
, but there was no emitter.onComplete
anywhere in my code!
So emitter.onComplete
is inside the OnCompleteListener
. But it's commented out — that's not where that call should be. What I've been searching for is a way to call .onComplete()
on the emitter
only after all the files have been processed. With my limited experience with RxJava, the only way I'm seeing is first doing a count of the number of files to be uploaded, then manually completing the sequence when that count is reached. Isn't there a better way?
emitter.onComplete()
is exactly where it should be.The Observable returned from your upload
method is responsible for one thing only: uploading the single file specified by the Uri parameter. That task is done when the UploadTask
completes, so you should call onComplete
at that time.
The flatMap
operator is responsible for collecting all of these results into a single Observable
. It will emit onComplete
downstream when all files have been uploaded, which is exactly what you want.