I'm building an app that exchanges data with an external device through a bluetooth spp connection. After searching a bit here on how to maintain the connection between activities I found the better way is to implement a Service and bind it to the activities. I think the code I wrote is right but when I move from the first to the second activity and I send a request to the external device I loose some messages of the answer from the read buffer. If I wait about 5 seconds before make the request it works fine.
Here are my classes, I'm using a Bluetooth library called BlueFlow:
class BluetoothService : Service() {
private val btChannel: BtChannel by inject()
inner class LocalBinder: Binder() {
val bindService = this@BluetoothService
}
override fun onBind(intent: Intent?): IBinder {
return LocalBinder()
}
fun sendRequest(data: ByteArray): Boolean {
return btChannel.send(data)
}
fun readChannel(): LiveData<ByteArray>? {
return btChannel.read()?.cancellable()?.asLiveData()
}
}
class BtChannel(private val blueFlow: BlueFlow) {
fun read(): Flow<ByteArray>? {
return blueFlow.getIO()?.readByteArrayStream()
}
fun send(data: ByteArray): Boolean {
return blueFlow.getIO()?.send(data) ?: false
}
}
// This is the readByteArrayStream function from the blueFlow library
@ExperimentalCoroutinesApi
fun readByteArrayStream(
delayMillis: Long = 1000,
minExpectedBytes: Int = 2,
bufferCapacity: Int = 1024,
readInterceptor: (ByteArray) -> ByteArray? = { it }
): Flow<ByteArray> = channelFlow {
if (inputStream == null) {
throw NullPointerException("inputStream is null. Perhaps bluetoothSocket is also null")
}
val buffer = ByteArray(bufferCapacity)
val byteAccumulatorList = mutableListOf<Byte>()
while (isActive) {
try {
if (inputStream.available() < minExpectedBytes) {
delay(delayMillis)
continue
}
val numBytes = inputStream.read(buffer)
val readBytes = buffer.trim(numBytes)
if (byteAccumulatorList.size >= bufferCapacity)
byteAccumulatorList.clear()
byteAccumulatorList.addAll(readBytes.toList())
val interceptor = readInterceptor(byteAccumulatorList.toByteArray())
if (interceptor == null)
delay(delayMillis)
interceptor?.let {
offer(it)
byteAccumulatorList.clear()
}
} catch (e: IOException) {
byteAccumulatorList.clear()
closeConnections()
error("Couldn't read bytes from flow. Disconnected")
} finally {
if (bluetoothSocket?.isConnected != true) {
byteAccumulatorList.clear()
closeConnections()
break
}
}
}
}.flowOn(Dispatchers.IO)
class FirstActivity: AppCompatActivity() {
private lateinit var viewModel: FirstViewModel
private lateinit var mBluetoothService: BluetoothService
private var mBound = false
private val serviceConnection: ServiceConnection = object : ServiceConnection {
override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
val binder = service as BluetoothService.LocalBinder
mBluetoothService = binder.bindService
mBound = true
}
override fun onServiceDisconnected(name: ComponentName?) {
mBound = false
}
}
override fun onStart() {
super.onStart()
startBluetoothService()
setObservers()
}
override fun onStop() {
super.onStop()
unbindService(serviceConnection)
mBound = false
removeObservers()
}
private fun startBluetoothService() {
val intent = Intent(this, BluetoothService::class.java)
bindService(intent, serviceConnection, Context.BIND_AUTO_CREATE)
}
private fun setObservers() {
viewModel.hours.observe(this) {hoursTv?.text = it }
mBluetoothService?.readChannel()?.observe(this, { viewModel.getData(it) })
}
private fun removeObservers() {
viewModel.hours.removeObservers(this@FirstActivity)
mBluetoothService?.readChannel()?.removeObservers(this@FirstActivity)
}
}
class FirstViewModel: ViewModel() {
val hours: MutableLiveData<String> by lazy {
MutableLiveData<String>()
}
fun getData(bytes: ByteArray) = viewModelScope.launch {
getDataSafeCall(bytes)
}
@ExperimentalUnsignedTypes
private fun getDataSafeCall(bytes: ByteArray) {
PacketFrame.decodePacket(bytes.toUByteArray(), bytes.size).onEach { updateData(it) }.launchIn(viewModelScope)
}
fun updateData(packet: HashMap<PacketHeader, UByteArray>) {
packet.forEach { packet ->
when (packet.key) {
PacketHeader.Hour -> {
hours.value = PacketFrame.payloadToString(packet.value)
}
else -> {
}
}
}
}
}
class SecondActivity: AppCompatActivity() {
//same as FirstActivity
}
class SecondViewModel: ViewModel() {
// same as FirstViewModel
}
PacketFrame is a singleton. I use it to compose packets received from bluetooth with a custom protocol.
Do you have any suggestion?
UPDATE:
I'm reading the doc trying to find where is the issue and I found this:
Flows are cold Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected.
But I also found this:
channelFlow Creates an instance of a cold Flow with elements that are sent to a SendChannel provided to the builder’s block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently. The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow. This builder ensures thread-safety and context preservation, thus the provided ProducerScope can be used concurrently from different contexts. The resulting flow completes as soon as the code in the block and all its children completes.
What I understand is that channelFlow (used for the readByteArrayStream function in the library) continues to run until the consumer is alive and requests elements.
In my code I launch the coroutine in the ViewModel that it's not cleared when I call the second Activity because it remains in the stack, so the function on the first ViewModel continues to receive data from bluetooth until the flow is created in the second ViewModel and it's consumed using LiveData e observing it.
What do you think about it? Any suggestion on how to solve?
I will try to suggest a couple of things although it's hard to know what you are trying to achieve and why you want to launch a new activity in the middle of a readout.
I'd recommend changing your readChannel()
return type into SharedFlow
and convert the readByteArrayStream()
function into SharedFlow as well using shareIn() call. Then collect it as is without converting it into livedata. This library was built before StateFlow
and SharedFlow
were introduced so these new flow types might come in handy in this case.
Another approach might be to use SharedFlow
again that will act as an event bus and re-emit the data collected by readByteArrayStream()
see here for an example of such an EventBus implementation.
That's all I can suggest for now. I'm really not sure about your use case and I would like to check a sample project linked to the relevant issue if it's possible for better understanding.