Search code examples
androidrx-javarxandroidble

Using createNewLongWriteBuilder with RxAndroidBLE Correctly


I have a device that can only accept 20 bytes at a time on the virtual serial port (using BLE). If I'm not mistaken, createNewLongWriteBuilder seems to be the perfect method for this.

Here is my attempt:

String newNameMsg = "SOME STRING THAT IS LONGER THAN 20 CHARACTERS";

byte[] byteMsg = newNameMsg.getBytes(Charset.forName("UTF-8"));
byte[] endLine = hexStringToByteArray("0D"); // signifies end of line for my device
byte[] newName = new byte[byteMsg.length + endLine.length];
System.arraycopy(byteMsg, 0, newName, 0, byteMsg.length);
System.arraycopy(endLine, 0, newName, byteMsg.length, endLine.length);


connectionObservable
    .flatMap(rxBleConnection -> rxBleConnection.createNewLongWriteBuilder()
    .setCharacteristicUuid(characteristicUuid)
    .setBytes(newName)
    .setMaxBatchSize(20) // my device only accepts 20 characters at a time.
    .setWriteOperationAckStrategy(new RxBleConnection.WriteOperationAckStrategy() {
            @Override
            public Observable<Boolean> call(Observable<Boolean> booleanObservable) {
                return Observable.just(true); // this is supposed to tell the LongWriteBuilder that we should continue sending data, correct?
            }
        })
    .build()
    )
    .subscribe(
        byteArray -> {
        // Written data.
        Log.i("BLE Controller","Data has been written!");
        },
        throwable -> {
        // Handle an error here.
        }
    );

Actual Results: The device does not receive any data, but the logs show:

D/RxBle#Radio: QUEUED RxBleRadioOperationCharacteristicLongWrite(107353908) D/RxBle#Radio: STARTED RxBleRadioOperationCharacteristicLongWrite(107353908) I/BLE Controller: Data has been written!

D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicLongWrite(107353908)

UPDATE 03/30/2017:

My original understanding was incorrect and had many issues with it. I was sending a subscription instead of an Observable.

s_noopy pointed out that:

The WriteOperationAckStrategy is effectively an equavilent of ? Observable.repeatWhen() operator. The filtering should happen inside the WOAS to trigger the repeat when ready.`

Current Situation:

I need to wait for my device to clear the TX flag before I can send the next batch. To do this, I need to implement setWriteOperationAckStrategy but I need to read the TX flag to see if it is clear before sending the next batch.

My attempt:

@Override
public Observable<Boolean> call(Observable<Boolean> objectObservable) {
    return connectionObservable
                .flatMap(rxBleConnection -> rxBleConnection.readCharacteristic(serialTX.getUuid()))
                .observeOn(AndroidSchedulers.mainThread());
}

UPDATE 07/30/2017:

Modified s_noopy's code so that I now have:

final ByteArrayBatchObservable byteArrayBatchObservable = new ByteArrayBatchObservable(newName, 19);
Log.i("BLE Controller","sending updated name"); 
byteArrayBatchObservable.flatMap(bytesBatch -> // using batches of data to write...
    connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(vspT, bytesBatch) 
        .flatMap(writtenBytes -> { // ...and when each batch will be written...
            final Func1<byte[], Boolean> filterFunction = txBytes -> checkIfZero(txBytes);
            return rxBleConnection
                    .readCharacteristic(vspT.getUuid()) 
                    .repeat() // ...and repeat it...
                    .takeUntil(filterFunction)
                    .filter(filterFunction) // ...but don't emit anything until then...
                    .map(readBytes -> writtenBytes); // ...and emit the writtenBytesBatch...
        }
    ), 1)

)
.subscribe(
    byteArray -> {
        // Written data.
        Log.i("BLE Controller","BT has been renamed! :" + byteArray.toString());
    },
    throwable -> {
        // Handle an error here.
        Log.i("BLE Controller","BT rename ERROR");

    }
);

public boolean checkIfZero(byte[] txBytes){
    Log.i("BLE Controller", "checking if tx is cleared: " +txBytes.toString());
    for (byte b : txBytes) {
        if (b != 0) {
            return false;
        }
    }
    return true;
}

Current Situation:

I have tried to convert from using rxBleConnection to using the connectionObservable. It appears as though the first batch is written successfully, but even though the subscribe function returns a successfully written bytearray both times, the bluetooth device only sees the first batch

Logs

07-31 13:26:54.434 25060-25060/com.packet.sniffer I/BLE Controller: sending updated name
07-31 13:26:54.434 264-467/? I/ThermalEngine: Sensor:pa_therm1:33000 mC
07-31 13:26:54.440 25060-25060/com.packet.sniffer D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicWrite(99278425)
07-31 13:26:54.443 25060-25187/com.packet.sniffer D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicWrite(99278425)
07-31 13:26:54.458 25060-25060/com.packet.sniffer D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicWrite(56755989)
07-31 13:26:54.611 25060-25072/com.packet.sniffer D/RxBle#BluetoothGatt: onCharacteristicWrite characteristic=569a2000-b87f-490c-92cb-11ba5ea5167c status=0
07-31 13:26:54.614 25060-25243/com.packet.sniffer D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicRead(32706505)
07-31 13:26:54.614 25060-25187/com.packet.sniffer D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicWrite(99278425)
07-31 13:26:54.615 25060-25187/com.packet.sniffer D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicWrite(56755989)
07-31 13:26:54.714 25060-25071/com.packet.sniffer D/RxBle#BluetoothGatt: onCharacteristicWrite characteristic=569a2000-b87f-490c-92cb-11ba5ea5167c status=0
07-31 13:26:54.721 25060-25243/com.packet.sniffer D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicRead(39309874)
07-31 13:26:54.723 25060-25187/com.packet.sniffer D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicWrite(56755989)
07-31 13:26:54.724 25060-25187/com.packet.sniffer D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicRead(32706505)
07-31 13:26:54.809 1980-2244/com.android.bluetooth I/bt_btif_gatt: set_read_value unformat.len = 20 
07-31 13:26:54.811 25060-25072/com.packet.sniffer D/RxBle#BluetoothGatt: onCharacteristicRead characteristic=569a2000-b87f-490c-92cb-11ba5ea5167c status=0
07-31 13:26:54.813 25060-25243/com.packet.sniffer I/BLE Controller: checking if tx is cleared: [B@c805018
07-31 13:26:54.813 25060-25243/com.packet.sniffer I/BLE Controller: BT has been renamed! :[B@5999a71
07-31 13:26:54.813 25060-25243/com.packet.sniffer I/BLE Controller: checking if tx is cleared: [B@c805018
07-31 13:26:54.816 25060-25187/com.packet.sniffer D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicRead(32706505)
07-31 13:26:54.817 25060-25187/com.packet.sniffer D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicRead(39309874)
07-31 13:26:54.908 1980-2244/com.android.bluetooth I/bt_btif_gatt: set_read_value unformat.len = 20 
07-31 13:26:54.910 25060-25071/com.packet.sniffer D/RxBle#BluetoothGatt: onCharacteristicRead characteristic=569a2000-b87f-490c-92cb-11ba5ea5167c status=0
07-31 13:26:54.911 25060-25243/com.packet.sniffer I/BLE Controller: checking if tx is cleared: [B@a1f9bcf
07-31 13:26:54.911 25060-25243/com.packet.sniffer I/BLE Controller: BT has been renamed! :[B@208995c
07-31 13:26:54.911 25060-25243/com.packet.sniffer I/BLE Controller: checking if tx is cleared: [B@a1f9bcf
07-31 13:26:54.914 25060-25187/com.packet.sniffer D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicRead(39309874)
07-31 13:26:56.467 1980-2388/com.android.bluetooth D/HeadsetStateMachine: Disconnected process message: 10, size: 0

UPDATE 07/31/2017:

Updated to make previous code synchronous

byteArrayBatchObservable.flatMap(bytesBatch -> // using batches of data to write...
byteArrayBatchObservable.flatMap(bytesBatch -> // using batches of data to write...
    connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(vspT, bytesBatch)
                            .flatMap(writtenBytes -> { // ...and when each batch will be written...
                                    final Func1<byte[], Boolean> filterFunction = txBytes -> checkIfZero(txBytes);
                                    return rxBleConnection
                                            .readCharacteristic(vspT.getUuid())
                                            .repeat() // ...and repeat it...
                                            .takeUntil(filterFunction)
                                            .filter(filterFunction) // ...but don't emit anything until then...
                                            .map(readBytes -> writtenBytes); // ...and emit the writtenBytesBatch...
                                }
                            ))
, 1)

Unfortunately still not working, here are the relevant logs:

4:55:35.108 25084-25084/com.packet.sniffer I/BLE Controller: sending updated name
07-31 14:55:35.110 25084-25084/com.packet.sniffer D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicWrite(135224277)
07-31 14:55:35.113 25084-25209/com.packet.sniffer D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicWrite(135224277)
07-31 14:55:35.113 25084-25084/com.packet.sniffer I/BluetoothLEController: verify connectivity
07-31 14:55:35.219 25084-25096/com.packet.sniffer D/RxBle#BluetoothGatt: onCharacteristicWrite characteristic=569a2000-b87f-490c-92cb-11ba5ea5167c status=0
07-31 14:55:35.225 25084-25243/com.packet.sniffer D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicRead(155469961)
07-31 14:55:35.228 25084-25209/com.packet.sniffer D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicWrite(135224277)
07-31 14:55:35.229 25084-25209/com.packet.sniffer D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicRead(155469961)
07-31 14:55:35.316 25084-25097/com.packet.sniffer D/RxBle#BluetoothGatt: onCharacteristicRead characteristic=569a2000-b87f-490c-92cb-11ba5ea5167c status=0
07-31 14:55:35.317 25084-25243/com.packet.sniffer I/BLE Controller: checking if tx is cleared: [B@f0396a7
07-31 14:55:35.317 25084-25243/com.packet.sniffer I/BLE Controller: BT has been renamed! :[B@8983754
07-31 14:55:35.317 25084-25243/com.packet.sniffer I/BLE Controller: checking if tx is cleared: [B@f0396a7
07-31 14:55:35.320 25084-25209/com.packet.sniffer D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicRead(155469961)

Solution

  • From the documentation of setWriteOperationAckStrategy():

    If you want to delay the next batch use provided observable and add some custom behavior (delay, waiting for a message from the device, etc.)
    

    Judging from your snippet here:

    .setWriteOperationAckStrategy(new RxBleConnection.WriteOperationAckStrategy() {
        @Override
        public Observable<Boolean> call(Observable<Boolean> booleanObservable) {
            return Observable.just(true); // this is supposed to tell the LongWriteBuilder that we should continue sending data, correct?
        }
    })
    

    You are not interested in delay of the next batch. In this situation you can just leave out setting the WriteOperationAckStrategy as the documentation states: If this is not specified - the next batch of bytes is written right after the previous one has finished. Which is an equavilent to:

    .setWriteOperationAckStrategy(new RxBleConnection.WriteOperationAckStrategy() {
        @Override
        public Observable<Boolean> call(Observable<Boolean> objectObservable) {
            return objectObservable;
        }
    })
    

    Edit: To delay the next batch write there is a need to delay the ACK signal. As the Long Write operation is making sure no other operation will happen in between writes - the only possible option is to relay on characteristic notifications / indications or other side channel events.

    Edit 1: Alternative approach without using the Long Write could look like this. Consider a class that would make batches of a long byte[] that is needed to be written:

    public class ByteArrayBatchObservable extends Observable<byte[]> {
    
        public ByteArrayBatchObservable(@NonNull final byte[] bytes, final int maxBatchSize) {
            super(SyncOnSubscribe.createSingleState(
                    new Func0<ByteBuffer>() {
                        @Override
                        public ByteBuffer call() {
                            return ByteBuffer.wrap(bytes);
                        }
                    },
                    new Action2<ByteBuffer, Observer<? super byte[]>>() {
                        @Override
                        public void call(ByteBuffer byteBuffer, Observer<? super byte[]> observer) {
                            int nextBatchSize = Math.min(byteBuffer.remaining(), maxBatchSize);
                            if (nextBatchSize == 0) {
                                observer.onCompleted();
                                return;
                            }
                            final byte[] nextBatch = new byte[nextBatchSize];
                            byteBuffer.get(nextBatch);
                            observer.onNext(nextBatch);
                        }
                    }
            ));
        }
    }
    

    Then in your code you could use a similar code to:

    final ByteArrayBatchObservable byteArrayBatchObservable = new ByteArrayBatchObservable(longByteArrayToWrite, maxBatchSize); // create an observable that will make chunks of data small enough to write at once
    return byteArrayBatchObservable.flatMap(bytesBatch -> // using batches of data to write...
                    rxBleConnection.writeCharacteristic(characteristicUuid, bytesBatch) // ...write them on characteristic...
                            .flatMap(writtenBytes -> { // ...and when each batch will be written...
                                        final Func1<byte[], Boolean> filterFunction = txBytes -> txBytes.length == 1 && txBytes[0] == 0;
                                        return rxBleConnection
                                                .readCharacteristic(txCharacteristicUuid) // ...start reading the TX characteristic...
                                                .repeat() // ...and repeat it...
                                                .takeUntil(filterFunction) // ...until the read value will indicate that the device is ready for the next batch...
                                                .filter(filterFunction) // ...but don't emit anything until then...
                                                .map(readBytes -> writtenBytes); // ...and emit the writtenBytesBatch...
                                    }
                            ),
            1 // ...to be sure that only one .writeCharacteristic() will be subscribed at any given time
    );
    

    Edit 2: Per what you wanted to achieve when writing a long name:

    1. Write a batch of data
    2. Perform a read of the device readiness flag
    3. If device is not ready go back to 2.
    4. If there is still more data to write go back to 1.

    What you can see in logs:

    1. Batch of data is written
    2. Another batch of data is written
    3. Read is performed
    4. Another read is performed

    And that is exactly what you have in your code as you are not synchronizing individual batches. It is probably a copy-paste error because of the place where you have put the .flatMap(Observable, 1):

    final ByteArrayBatchObservable byteArrayBatchObservable = new ByteArrayBatchObservable(newName, 19);
    Log.i("BLE Controller","sending updated name"); 
    byteArrayBatchObservable.flatMap(bytesBatch -> // using batches of data to write...
        connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(vspT, bytesBatch) 
            .flatMap(writtenBytes -> { // ...and when each batch will be written...
                final Func1<byte[], Boolean> filterFunction = txBytes -> checkIfZero(txBytes);
                return rxBleConnection
                        .readCharacteristic(vspT.getUuid()) 
                        .repeat() // ...and repeat it...
                        .takeUntil(filterFunction)
                        .filter(filterFunction) // ...but don't emit anything until then...
                        .map(readBytes -> writtenBytes); // ...and emit the writtenBytesBatch...
            }
        ), 1)
    
    )
    

    Where this code should be only a bit different to achieve synchronous behaviour:

    final ByteArrayBatchObservable byteArrayBatchObservable = new ByteArrayBatchObservable(newName, 19);
    Log.i("BLE Controller","sending updated name"); 
    byteArrayBatchObservable.flatMap(bytesBatch -> // using batches of data to write...
        connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(vspT, bytesBatch) 
            .flatMap(writtenBytes -> { // ...and when each batch will be written...
                final Func1<byte[], Boolean> filterFunction = txBytes -> checkIfZero(txBytes);
                return rxBleConnection
                        .readCharacteristic(vspT.getUuid()) 
                        .repeat() // ...and repeat it...
                        .takeUntil(filterFunction)
                        .filter(filterFunction) // ...but don't emit anything until then...
                        .map(readBytes -> writtenBytes); // ...and emit the writtenBytesBatch...
            }
        ))
        .take(1)
    , 1)