Search code examples
androidrx-java2rxandroidble

How to enable/disable to notification/indication in RxAndroidBLE


I am creating a RxJava2 chain where in I want to enable and disable notification. the flow I am setting is as follows.

  1. establish a connection.
  2. set the notification to READ_STATUS UUID.
  3. if the returned byte is zero then perform a write byte 01 to WRITE_STATUS UUID and after WRITE_STATUS, enable the notification of READ_STATUS UUID to verify it has byte value 1.
  4. else if the returned byte is 1 then just enable other indicators (UUID1, UUID2,UUD3) and read the value.

I have a problem at step 2 and 3 where I am reading the value of READ_STATUS UUID by enabling the notification. in order to re-read the value, I probably need to disable the notification and then again enable it. And to disable to the notification I have to dispose that particular setupNotification .

Code is as follows

  connectDisposable=

                    device.establishConnection(false)

                   .flatMap(rxBleConnection -> {

                       rxBleConnection.discoverServices();
                       mRxBleConnection = rxBleConnection;
                       return Observable.just(rxBleConnection);
                   })
                   .flatMap(rxBleConnection ->mRxBleConnection.setupNotification(READ_STATUS,NotificationSetupMode.QUICK_SETUP).flatMap(it->it))
                   .takeUntil(bytes -> { 
                        
                        if(getByteValue(bytes)==0)
                            return false;// dispose above to disable the notification
                        else
                            return true; // no need to disable the notification and continue writing
                    })
                            

                     .flatMap(bytes -> {

                        return Observable.zip(

                                mRxBleConnection.writeCharacteristic(WRITE_STATUS, new byte[]{1}).toObservable(),
                                // setupNotification again to check whether read status has 1 or not 
                                mRxBleConnection.setupNotification(READ_STATUS, NotificationSetupMode.QUICK_SETUP).flatMap(it->it),
                                Pair::new

                        );
                    })
                     .flatMap(bytes ->{

                        byte [] val= bytes.first;
                        if(getByteValue(val) == 1){

                            return Observable.zip(
                                    mRxBleConnection.setupIndication(HISTORY, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 1 ")).flatMap(it -> it),
                                    mRxBleConnection.setupIndication(PARAMCHECK, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 2 ")).flatMap(it -> it),
                                    mRxBleConnection.setupIndication(FAULTINFO, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 3 ")).flatMap(it -> it),
                                  
                                    Data::Readings);
                        }
                       
                        return Observable.empty();
                    }).subscribe(data -> {
                    
                    });

The problem with this code is my takeUntil is firing at the last it does not dispose the previous setupNotificaion operation so that I can re read it later.

I tried solution mentioned over this thread but unfortunately I am not sharing the RxBleConnection


Solution

  • The problem with this code is my takeUntil is firing at the last it does not dispose the previous setupNotificaion operation so that I can re read it later.

    The problem is that your condition is inverted. From .takeUntil() Javadoc:

     * @return an Observable that first emits items emitted by the source Observable, checks the specified
     *         condition after each item, and then completes when the condition is satisfied.
    

    You have used:

                       .takeUntil(bytes -> { 
                            
                            if(getByteValue(bytes)==0)
                                return false;// dispose above to disable the notification
                            else
                                return true; // no need to disable the notification and continue writing
                        })
    

    where it should be satisfied (return true) when the upstream should get disposed:

                       .takeUntil(bytes -> { 
                            
                            if(getByteValue(bytes)==0)
                                return true;// dispose above to disable the notification
                            else
                                return false; // no need to disable the notification and continue writing
                        })