I want to perform an action, which is executed after subscribing to an Observable
. Currently I perform some BLE related operations like reading the rssi of my connection:
public class BleGattOperations {
private class BleGattCallback extends BluetoothGattCallback {
public void onReadRemoteRssi(BluetoothGatt gatt, int rssi, int status) {
readRssiSubject.onNext(new RssiState(gatt, rssi, status));
}
}
// Defined in Constructor
private final BluetoothGattCallback callback;
// Defined in Constructor
private BluetoothGatt gatt;
private final Subject<RssiState, RssiState> readRssiSubject = PublishSubject.create();
public Observable<RssiState> readRssi() {
return readRssiSubject
.doOnSubscribe(() -> {
gatt.readRemoteRssi();
})
.take(1);
}
}
Within the readRssi()
-method I subscribe to the readRssiSubject
, which emits items in the BleGattCallback.onReadRemoteRssi-Method()
. The read rssi operation on ble side is triggered via the gatt.readRemoteRssi()
operation inside the doOnSubscribe()
-block.
Unfortunately the doOnSubscribe-method is executed before any subscription to the subject is done and sometimes the read rssi operation is faster executed than the subscription to the readRssiSubject
is done. Therefore the newly subscribed subscriber does not receive the result of the rssi measurement as the subscription was done, after the onReadRemoteRssi
-method was executed.
Maybe some experimental code snippet helps to understand what is basically happen:
public Observable<RssiState> readRssi() {
return readRssiSubject
.doOnSubscribe(() -> {
readRssiSubject.onNext(new RssiState());
})
.take(1);
}
Is there a way to perform a action after a subscription was done to an Subject/Observable?
From doOnSubscribe
documentation:
Each subscription will result in an invocation of the given action except when the source ObservableSource is reference counted, in which case the source ObservableSource will invoke the given action for the first subscription.
And PublishSubject
is reference counted.
Although you are using PublishSubject
it does not look like you are going to share messages from it to more than one subscriber. In that case I would recommend the following implemetation of readRssi()
:
public Observable<RssiState> readRssi() {
return Observable.create(emitter -> {
// this is invoked for every new subscriber
BluetoothGattCallback callback =
(BluetoothGatt gatt, int rssi, int status) -> {
emitter.onNext(new RssiState(gatt, rssi, status));
emitter.onComplete();
};
gatt.addListener(callback); // not sure about method name
gatt.readRemoteRssi(); // initiate read
});
}
See documentation and example for create
method.