I'm making an Android application with some BLE interraction using the RxAndroidBLE API. I followd examples guidelines and samples from https://github.com/Polidea/RxAndroidBle
I establish a BLE connection with a specified device, later while connected I read and write characteristic with no problem, but when i try to setup notification for the battery level characteristic I get the following throwable error message : Already connected to device with MAC address XX:XX..."
I really don't understand the error in that context since I can read and write in characteristic with no problem.
I want to setup notification for this characteristic after an initial read of its value for specific purpose.
Here is a sample code that reproduce my problem :
private lateinit var device: RxBleDevice
private var connectionObservable: Observable<RxBleConnection>? = null
private var rxBleConnection: RxBleConnection? = null
private val connectionDisposable = CompositeDisposable()
private val connectionStateDisposable = CompositeDisposable()
private var notifyValueChangeSubscription = CompositeDisposable()
var enableBatteryNotificationRunnable: Runnable = Runnable {
enableBatteryNotification()
}
private var myHandler = Handler()
val DELAY_BEFORE_ENABLE_NOTIFICATION: Long = 100
private fun connect() {
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
connectionObservable?.let {
connectionDisposable.add(it.subscribe(
{ rxBleConnection ->
this.rxBleConnection = rxBleConnection
},
{ _ ->
Log.e("connect", "connexion error")
})
)
}
val state = device.observeConnectionStateChanges().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
connectionStateDisposable.add(
state.subscribe(
{ connectionState ->
Log.i("connect", "connexion state :$connectionState")
if(connectionState == RxBleConnection.RxBleConnectionState.CONNECTED) {
myHandler.postDelayed(enableBatteryNotificationRunnable, DELAY_BEFORE_ENABLE_NOTIFICATION);
}
}
)
{ _ ->
Log.e("connection listener", "connexion state error")
}
)
}
private fun enableBatteryNotification () {
connectionObservable?.let {
var observableToReturn = it
.flatMap { it.setupNotification(UUID_BATTERY_LEVEL) }
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes ->
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
}
Thanks in advance for any help :)
setupNotification returns “Error already connected” whereas no connection request is send
Two connection requests are actually made — hence the error. From the RxBleDevice.establishConnection()
Javadoc:
* Establishes connection with a given BLE device. {@link RxBleConnection} is a handle, used to process BLE operations with a connected
* device.
In your code there are two subscriptions to the establishConnection()
Observable
.
private lateinit var device: RxBleDevice
private var connectionObservable: Observable<RxBleConnection>? = null
private var rxBleConnection: RxBleConnection? = null
private val connectionDisposable = CompositeDisposable()
private val connectionStateDisposable = CompositeDisposable()
private var notifyValueChangeSubscription = CompositeDisposable()
var enableBatteryNotificationRunnable: Runnable = Runnable {
enableBatteryNotification()
}
private var myHandler = Handler()
val DELAY_BEFORE_ENABLE_NOTIFICATION: Long = 100
private fun connect() {
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
connectionObservable?.let {
connectionDisposable.add(it.subscribe( // << Here is the first subscription
{ rxBleConnection ->
this.rxBleConnection = rxBleConnection
},
{ _ ->
Log.e("connect", "connexion error")
})
)
}
val state = device.observeConnectionStateChanges().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
connectionStateDisposable.add(
state.subscribe(
{ connectionState ->
Log.i("connect", "connexion state :$connectionState")
if(connectionState == RxBleConnection.RxBleConnectionState.CONNECTED) {
myHandler.postDelayed(enableBatteryNotificationRunnable, DELAY_BEFORE_ENABLE_NOTIFICATION);
}
}
)
{ _ ->
Log.e("connection listener", "connexion state error")
}
)
}
private fun enableBatteryNotification () {
connectionObservable?.let {
var observableToReturn = it
.flatMap { it.setupNotification(UUID_BATTERY_LEVEL) }
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes -> // << Here is the second subscription
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
}
This situation is a common source of confusion for people learning RxJava
. There are three paths to fix your situation. From least to most amount of work:
establishConnection
Observable
It is possible to share a single RxBleConnection
with RxReplayingShare
. Change this:
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
To this:
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(ReplayingShare.instance())
rxBleConnection: RxBleConnection?
propertyInstead of:
connectionObservable?.let {
var observableToReturn = it
.flatMap { it.setupNotification(UUID_BATTERY_LEVEL) }
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes -> // << Here is the second subscription
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
Make it:
rxBleConnection?.let {
var observableToReturn = rxBleConnection.setupNotification(UUID_BATTERY_LEVEL)
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes -> // << Here is the second subscription
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
This is discouraged as you may end up with a RxBleConnection
that is no longer valid as it may have been disconnected before calling enableBatteryNotification()
.subscribe()
This is a custom solution tailored to your exact use-case. Unfortunately with the information you have added is not enough to create a drop-in code replacement but it could look something like this:
device.establishConnection(false)
.flatMap { connection ->
Observable.merge(
connection.readCharacteristic(uuid0).map { ReadResult(uuid0, it) }.toObservable(),
connection.setupNotification(uuid1).flatMap { it }.map { NotifyResult(uuid1, it) }.delaySubscription(100, TimeUnit.MILLISECONDS)
)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ /* handle ReadResult/NotifyResult */ },
{ /* handle potential errors */ }
)
Where ReadResult
and NotifyResult
would be data class
that take UUID
and ByteArray