Search code examples
react-nativerx-javareact-native-ble-plx

Where should I put onBackPressureBuffer(n) in a RxJava subscription chain?


I'm trying to hotfix an existing React Native library react-native-ble-plx adding onBackPressureBuffer() in existing Java code.

I know this is ugly but I have no time to submit a PR right now and there is a pending issue that may solve the problem. I'm doing this because the event emitter works at 200Hz. I need a safe way to buffer items on native side while they're consumed at their own pace on JavaScript side.

So the code becomes like the following:

       final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
            @Override
            public Observable<Observable<byte[]>> call() {
                int properties = gattCharacteristic.getProperties();
                BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                        .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
                NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                        : NotificationSetupMode.COMPAT;
                if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
                    return connection.setupNotification(gattCharacteristic, setupMode);
                }

                if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
                    return connection.setupIndication(gattCharacteristic, setupMode);
                }

                return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
            }
        }).onBackpressureBuffer(1000)  <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
            @Override
            public Observable<byte[]> call(Observable<byte[]> observable) {
                return observable;
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }
        }).subscribe(new Observer<byte[]>() {
            @Override
            public void onCompleted() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onError(Throwable e) {
                errorConverter.toError(e).reject(promise);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onNext(byte[] bytes) {
                characteristic.logValue("Notification from", bytes);
                WritableArray jsResult = Arguments.createArray();
                jsResult.pushNull();
                jsResult.pushMap(characteristic.toJSObject(bytes));
                jsResult.pushString(transactionId);
                sendEvent(Event.ReadEvent, jsResult);
            }
        });

My problem is that even with that addition, I'm experiencing MissingBackPressure exceptions.

I've tried onBackPressureDrop() and I have exactly the same behavior. So I assume I'm doing it wrong, but can't figure out why right now.

Any help appreciated.


Solution

  • As you have said you are facing an issue with an react-native library and the above code did throw MissingBackpressureException previously.

    From Javadoc of .onBackpressureDrop() (bolding mine):

    Instructs an Observable that is emitting items faster than its observer can consume them to discard, rather than emit, those items that its observer is not prepared to observe.

    If the downstream request count hits 0 then the Observable will refrain from calling {@code onNext} until the observer invokes {@code request(n)} again to increase the request count.

    Backpressure:
    The operator honors backpressure from downstream and consumes the source {@code Observable} in an unbounded manner (i.e., not applying backpressure to it).
    Scheduler:
    {@code onBackpressureDrop} does not operate by default on a particular {@link Scheduler}.

    You can see that the next operators in the chain are .flatMap(), .doOnUnsubscribe() and .subscribe().

    From Javadoc of .flatMap() regarding backpressure:

    Backpressure:
    The operator honors backpressure from downstream. The outer {@code Observable} is consumed in unbounded mode (i.e., no backpressure is applied to it). The inner {@code Observable}s are expected to honor backpressure; if violated, the operator may signal {@code MissingBackpressureException}.

    Javadoc .doOnUnsubscribe():

    Backpressure:
    {@code doOnUnsubscribe} does not interact with backpressure requests or value delivery; backpressure behavior is preserved between its upstream and its downstream.

    And .subscribe():

    Backpressure:
    The operator consumes the source {@code Observable} in an unbounded manner (i.e., no backpressure is applied to it).

    As you can see none of the operators below .onBackpressure*() does apply backpressure on it. You would need to add an operator that does it right after .onBackpressure*(). One of such operators is .observeOn(Scheduler)

    Javadoc .observeOn():

    Backpressure: This operator honors backpressure from downstream and expects it from the source {@code Observable}. Violating this expectation will lead to {@code MissingBackpressureException}. This is the most common operator where the exception pops up; look for sources up the chain that don't support backpressure, such as {@code interval}, {@code timer}, {code PublishSubject} or {@code BehaviorSubject} and apply any of the {@code onBackpressureXXX} operators before applying applying {@code observeOn} itself.

    So a workable code could look like this:

    final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
        @Override
        public Observable<Observable<byte[]>> call() {
            int properties = gattCharacteristic.getProperties();
            BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                    .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
            NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                    : NotificationSetupMode.COMPAT;
            if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
                return connection.setupNotification(gattCharacteristic, setupMode);
            }
    
            if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
                return connection.setupIndication(gattCharacteristic, setupMode);
            }
    
            return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
        }
    })
    .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
        @Override
        public Observable<byte[]> call(Observable<byte[]> observable) {
            return observable;
        }
    })
    .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            promise.resolve(null);
            transactions.removeSubscription(transactionId);
        }
    })
    .onBackpressureBuffer(1000) // <---- Here is my modification
    .observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
    .subscribe(new Observer<byte[]>() {
        @Override
        public void onCompleted() {
            promise.resolve(null);
            transactions.removeSubscription(transactionId);
        }
    
        @Override
        public void onError(Throwable e) {
            errorConverter.toError(e).reject(promise);
            transactions.removeSubscription(transactionId);
        }
    
        @Override
        public void onNext(byte[] bytes) {
            characteristic.logValue("Notification from", bytes);
            WritableArray jsResult = Arguments.createArray();
            jsResult.pushNull();
            jsResult.pushMap(characteristic.toJSObject(bytes));
            jsResult.pushString(transactionId);
            sendEvent(Event.ReadEvent, jsResult);
        }
    });