Search code examples
javaandroidrx-javarx-androidrxandroidble

Continuously subscribing and unsubscribing from Observable for BLE scans


I'd like to implement a feature where a BLE scan is activated for 10 seconds, then breaks for 10 seconds, and repeats while the view is active.

I'm using RxAndroidBle which wraps the Android BLE APIs for reactivity. I have a method scanForRange(getConnectAddress()); which uses an Observable<RxBleScanResult> and some filters and a map applied to return a simple Observable<Integer>. To start the scan, this observable must simply be subscribed to, and then unsubscribed to stop the scan.

My first thought was to use Observable.interval() as shown below:

@Override
protected void onResume() {
    super.onResume();

    mRangeScanSubscription = Observable.interval(10000, TimeUnit.MILLISECONDS)
            .flatMap(new Func1<Long, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(Long aLong) {
                    return scanForRange(getConnectAddress());
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.d(TAG, "onResume() error");
                    throwable.printStackTrace();
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Toast.makeText(DeviceDetailActivity.this, "Range: " + integer, Toast.LENGTH_SHORT).show();
                }
            });
}

I would then unsubscribe from that Subscription in my onPause(). Now this does not work, as once the first interval is emitted, the scan is subscribed to and will continuously emit results (instead of the inteded start/stop every 10 seconds).

What would be the best way to implement this feature then? I assume I could probably hack it together with a mix of Observables and Handlers, but that seems wrong.


Solution

  • You can do this by using TakeUntil and RepeatWhen operators

            mRangeScanSubscription = scanForRange(getConnectAddress())
                .takeUntil(Observable.timer(10, TimeUnit.SECONDS)) //emits items for 10 seconds
                .repeatWhen(completed -> completed.delay(10, TimeUnit.SECONDS)) //resubscribes after 10 seconds
                ...