Search code examples
javaandroidrx-javalifecycleokhttp

How to download a file with progress updates using reactive extensions on Android


I am currently programming an app that needs to do lots combining of api-calls and is therefore using rxjava for the first time since it seems more convenient for handling async events and the Android lifecycles.

However the app also needs sometimes to load static datasets packaged in a zip archive. I try to use rx for this operation as well for consistency and it works well, but I dont really grasp how to subscribe to progress-events to update the UI with the file download progress.

This is the code I am using now for downloading a file which makes use of the okhttp-library:

downloadService.downloadFile(filename)
    .flatMap(new Func1<Response<ResponseBody>, Observable<File>>()
    {
        @Override
        public Observable<File> call(final Response<ResponseBody> responseBodyResponse)
        {
            return Observable.create(new Observable.OnSubscribe<File>()
            {
                @Override
                public void call(Subscriber<? super File> subscriber)
                {
                    try
                    {
                        File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
                        BufferedSink sink = Okio.buffer(Okio.sink(file));
                        sink.writeAll(responseBodyResponse.body().source());
                        sink.close();
                        subscriber.onNext(file);
                        subscriber.onCompleted();
                    }
                    catch (IOException e)
                    {
                        e.printStackTrace();
                        subscriber.onError(e);
                    }
                }
            });
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<File>()
    {
        @Override
        public void onCompleted()
        {
            Log.d("downloadZipFile", "onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
            Log.d("downloadZipFile", "Error " + e.getMessage());
        }

        @Override
        public void onNext(File file) {
            Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath());
        }
    });

What is a good way of implementing a subscription to progress events?


Solution

  • First of all: never use Observable.create() unless you know what you are doing and you are prepared to handle back pressure and everything an Observable is supposed to handle. (UPDATE: if you use RxJava2 you can safely use the create() method)

    That said start by creating a class to hold your progress information

    public class DownloadProgress<DATA> {
        private final float progress;
        private final DATA data;
    
        public DownloadProgress(float progress) {
            this.progress = progress;
            this.data = null;
        }
    
        public DownloadProgress(@NonNull DATA data) {
            this.progress = 1f;
            this.data = data;
        }
    
        public float getProgress() {
            return progress;
        }
    
        public boolean isDone() {
            return data != null;
        }
    
        public DATA getData() {
            return data;
        }
    }
    

    Then you can do something like this:

    public Observable<DownloadProgress<File>> downloadFile(@NonNull final String filename) {
        return downloadService.downloadFile(filename)
                .switchMap(response -> Observable.fromEmitter(emitter -> {
                    ResponseBody body = response.body();
                    final long contentLength = body.contentLength();
                    ForwardingSource forwardingSource = new ForwardingSource(body.source()) {
                        private long totalBytesRead = 0L;
    
                        @Override
                        public long read(Buffer sink, long byteCount) throws IOException {
                            long bytesRead = super.read(sink, byteCount);
                            // read() returns the number of bytes read, or -1 if this source is exhausted.
                            totalBytesRead += bytesRead != -1 ? bytesRead : 0;
                            boolean done = bytesRead == -1;
                            float progress = done ? 1f : (float) bytesRead / contentLength;
                            emitter.onNext(new DownloadProgress<>(progress));
                            return bytesRead;
                        }
                    };
                    emitter.setCancellation(body::close);
                    try {
                        File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
                        saveLocation.getParentFile().mkdirs();
                        BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
                        sink.writeAll(forwardingSource);
                        sink.close();
                        emitter.onNext(new DownloadProgress<>(saveLocation));
                        emitter.onCompleted();
                    } catch (IOException e) {
                        // RxJava1: emitter.onError(e);
                        emitter.tryOnError(e);
                    }
                }, Emitter.BackpressureMode.LATEST));
    }
    

    fromEmitter() is a recent addition to RxJava, currently experimental but works very well (it was renamed recently, was called fromAsync() before).

    You can then use it like this:

        String filename = "yourFileName";
        downloadFile(filename)
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(fileDownloadProgress -> {
                    float progress = fileDownloadProgress.getProgress();
                    // TODO update UI
                })
                .filter(DownloadProgress::isDone)
                .map(DownloadProgress::getData)
                .subscribe(file -> {
                    // file downloaded
                }, throwable -> {
                    // error
                });
    

    This should work (I didn't tested it) but doesn't let you cancel the ongoing http call if you unsubscribe.

    If you can modify your download service I would rather handle it through OkHttp, inspired by the Recipe example here:

    public Observable<DownloadProgress<File>> downloadFile(@NonNull final String url, @NonNull final File saveLocation) {
        return Observable.fromEmitter(emitter -> {
            Request request = new Request.Builder()
                    .url(url)
                    .build();
    
            final ProgressListener progressListener = (bytesRead, contentLength, done) -> {
                // range [0,1]
                float progress = done ? 1f : (float) bytesRead / contentLength;
                emitter.onNext(new DownloadProgress<>(progress));
            };
    
            OkHttpClient client = new OkHttpClient.Builder()
                    .addNetworkInterceptor(chain -> {
                        Response originalResponse = chain.proceed(chain.request());
                        return originalResponse.newBuilder()
                                .body(new ProgressResponseBody(originalResponse.body(), progressListener))
                                .build();
                    })
                    .build();
    
            final Call call = client.newCall(request);
            emitter.setCancellation(() -> call.cancel());
    
            try {
                Response response = call.execute();
                BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
                sink.writeAll(response.body().source());
                sink.close();
                emitter.onNext(new DownloadProgress<>(saveLocation));
                emitter.onCompleted();
            } catch (IOException e) {
                emitter.onError(e);
            }
        }, Emitter.BackpressureMode.LATEST);
    }
    

    For it you also need this:

    public static class ProgressResponseBody extends ResponseBody {
    
        private final ResponseBody responseBody;
        private final ProgressListener progressListener;
        private BufferedSource bufferedSource;
    
        public ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {
            this.responseBody = responseBody;
            this.progressListener = progressListener;
        }
    
        @Override
        public MediaType contentType() {
            return responseBody.contentType();
        }
    
        @Override
        public long contentLength() {
            return responseBody.contentLength();
        }
    
        @Override
        public BufferedSource source() {
            if (bufferedSource == null) {
                bufferedSource = Okio.buffer(source(responseBody.source()));
            }
            return bufferedSource;
        }
    
        private Source source(Source source) {
            return new ForwardingSource(source) {
                long totalBytesRead = 0L;
    
                @Override
                public long read(Buffer sink, long byteCount) throws IOException {
                    long bytesRead = super.read(sink, byteCount);
                    // read() returns the number of bytes read, or -1 if this source is exhausted.
                    totalBytesRead += bytesRead != -1 ? bytesRead : 0;
                    progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
                    return bytesRead;
                }
            };
        }
    }
    
    interface ProgressListener {
        void update(long bytesRead, long contentLength, boolean done);
    }
    

    Usage:

        String filename = "yourFileName";
        String url = "http://your.url.here";
        File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
        downloadFile(url, saveLocation)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(fileDownloadProgress -> {
                    float progress = fileDownloadProgress.getProgress();
                    // TODO update UI
                })
                .filter(DownloadProgress::isDone)
                .map(DownloadProgress::getData)
                .subscribe(file -> {
                    // file downloaded
                }, throwable -> {
                    // error
                });