Search code examples
androidrx-java

Perform action AFTER subscribing to observable


I want to perform an action, which is executed after subscribing to an Observable. Currently I perform some BLE related operations like reading the rssi of my connection:

public class BleGattOperations {

   private class BleGattCallback extends BluetoothGattCallback {
       public void onReadRemoteRssi(BluetoothGatt gatt, int rssi, int status) {
           readRssiSubject.onNext(new RssiState(gatt, rssi, status));
       }
   }

    // Defined in Constructor
    private final BluetoothGattCallback callback;
    // Defined in Constructor
    private BluetoothGatt gatt;
    private final Subject<RssiState, RssiState> readRssiSubject = PublishSubject.create();

    public Observable<RssiState> readRssi() {
        return readRssiSubject
            .doOnSubscribe(() -> {
                 gatt.readRemoteRssi();
            })
            .take(1);
    }
}

Within the readRssi()-method I subscribe to the readRssiSubject, which emits items in the BleGattCallback.onReadRemoteRssi-Method(). The read rssi operation on ble side is triggered via the gatt.readRemoteRssi() operation inside the doOnSubscribe()-block.

Unfortunately the doOnSubscribe-method is executed before any subscription to the subject is done and sometimes the read rssi operation is faster executed than the subscription to the readRssiSubject is done. Therefore the newly subscribed subscriber does not receive the result of the rssi measurement as the subscription was done, after the onReadRemoteRssi-method was executed.

Maybe some experimental code snippet helps to understand what is basically happen:

public Observable<RssiState> readRssi() {
    return readRssiSubject
        .doOnSubscribe(() -> {
             readRssiSubject.onNext(new RssiState());
        })
        .take(1);
}

Is there a way to perform a action after a subscription was done to an Subject/Observable?


Solution

  • From doOnSubscribe documentation:

    Each subscription will result in an invocation of the given action except when the source ObservableSource is reference counted, in which case the source ObservableSource will invoke the given action for the first subscription.

    And PublishSubject is reference counted.

    Alternative solution

    Although you are using PublishSubject it does not look like you are going to share messages from it to more than one subscriber. In that case I would recommend the following implemetation of readRssi():

    public Observable<RssiState> readRssi() {
        return Observable.create(emitter -> {
                 // this is invoked for every new subscriber
                 BluetoothGattCallback callback = 
                     (BluetoothGatt gatt, int rssi, int status) -> {
                         emitter.onNext(new RssiState(gatt, rssi, status));
                         emitter.onComplete();
                     };
                 gatt.addListener(callback); // not sure about method name
                 gatt.readRemoteRssi(); // initiate read
            });
    }
    

    See documentation and example for create method.