Search code examples
rxandroidble

When multiple devices connect at the same time, my app crashes with an error


When multiple devices connect at the same time, my app crashes with the below error. Why is this, and how can I resolve it?

When I force devices to connect sequentially after scanning, it works nicely.

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:60)
        at android.os.Handler.handleCallback(Handler.java:739)
        at android.os.Handler.dispatchMessage(Handler.java:95)
        at android.os.Looper.loop(Looper.java:135)
        at android.app.ActivityThread.main(ActivityThread.java:5312)
        at java.lang.reflect.Method.invoke(Native Method)
        at java.lang.reflect.Method.invoke(Method.java:372)
        at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:901)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:696)
    Caused by: rx.exceptions.OnErrorNotImplementedException
        at rx.Observable$27.onError(Observable.java:7923)
        at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
        at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
        at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
        at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
        at rx.internal.operators.CachedObservable$CacheState.onError(CachedObservable.java:201)
        at rx.internal.operators.CachedObservable$CacheState$1.onError(CachedObservable.java:175)
        at rx.observers.Subscribers$5.onError(Subscribers.java:225)
        at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
        at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onError(SubjectSubscriptionManager.java:227)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
        at rx.subjects.ReplaySubject$UnboundedReplayState.accept(ReplaySubject.java:465)
        at rx.subjects.ReplaySubject$UnboundedReplayState.replayObserverFromIndex(ReplaySubject.java:514)
        at rx.subjects.ReplaySubject$UnboundedReplayState.replayObserver(ReplaySubject.java:502)
        at rx.subjects.ReplaySubject.caughtUp(ReplaySubject.java:427)
        at rx.subjects.ReplaySubject.onError(ReplaySubject.java:387)
        at com.polidea.rxandroidble.internal.RxBleRadioOperation.onError(RxBleRadioOperation.java:84)
        at com.polidea.rxandroidble.internal.RxBleRadioOperation$1.onError(RxBleRadioOperation.java:50)
        at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
        at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
        at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:71)
        at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
        at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
        at rx.internal.operators.OperatorSingle$ParentSubscriber.onError(OperatorSingle.java:139)
        at rx.internal.operators.OperatorTake$1.onError(OperatorTake.java:62)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:810)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.accept(SubjectSubscriptionManager.java:318)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitLoop(SubjectSubscriptionManager.java:291)
        at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitFirst(SubjectSubscriptionManager.java:270)
        at rx.subjects.BehaviorSubject$1.call(BehaviorSubject.java:106)
        at rx.subjects.BehaviorSubject$1.call(BehaviorSubject.java:102)
        at rx.subjects.SubjectSubscriptionManager.add(SubjectSubscriptionManager.java:95)
        at rx.subjects.SubjectSubscriptionManager.call(SubjectSubscriptionManager.java:60)
        at rx.subjects.SubjectSubscriptionManager.call(SubjectSubscriptionManager.java:35)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8098)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastpath(OnSubscribeFromIterable.java:127)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:70)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.subscribe(Observable.java:8191)
        at rx.Observable.subscribe(Observable.java:8158)
        at com.polidea.rxandroidble.internal.operations.RxBleRadioOperationServicesDiscover.lambda$run$71(RxBleRadioOperationServicesDiscover.java:34)
        at com.polidea.rxandroidble.internal.operations.RxBleRadioOperationServicesDiscover.access$lambda$0(RxBleRadioOperationServicesDiscover.java)
        at com.polidea.rxandroidble.internal.operations.RxBleRadioOperationServicesDiscover$$Lambda$1.call(Unknown Source)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.subscribe(Observable.java:8191)
        at rx.Observable.subscribe(Observable.java:8158)
        at com.polidea.rxandroidble.internal.operations.RxBleRadioOperationServicesDiscover.run(RxBleRadioOperationServicesDiscover.java:44)
        at com.polidea.rxandroidble.internal.radio.RxBleRadioImpl$$Lambda$4.call(Unknown Source)
        at rx.Observable$27.onNext(Observable.java:7928)
        at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:139)
        at rx.internal.util.ScalarSynchronousObservable$ScalarSynchronousAction.call(ScalarSynchronousObservable.java:115)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
        ... 8 more
    Caused by: BleGattException{status=22, bleGattOperation=BleGattOperation{description='CONNECTION_STATE'}}
        at com.polidea.rxandroidble.internal.connection.RxBleGattCallback.propagateStatusErrorIfGattErrorOccurred(RxBleGattCallback.java:245)
        at com.polidea.rxandroidble.internal.connection.RxBleGattCallback.access$100(RxBleGattCallback.java:26)
        at com.polidea.rxandroidble.internal.connection.RxBleGattCallback$1.onConnectionStateChange(RxBleGattCallback.java:62)
        at android.bluetooth.BluetoothGatt$1.onClientConnectionState(BluetoothGatt.java:181)
        at android.bluetooth.IBluetoothGattCallback$Stub.onTransact(IBluetoothGattCallback.java:70)
        at android.os.Binder.execTransact(Binder.java:446)

------- Code Class --

package com.ths.bts.bt;

import android.bluetooth.BluetoothDevice;
import android.bluetooth.BluetoothGatt;
import android.bluetooth.BluetoothGattCharacteristic;
import android.bluetooth.BluetoothGattDescriptor;
import android.bluetooth.BluetoothGattService;
import android.os.Bundle;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;

import com.polidea.rxandroidble.RxBleConnection;
import com.polidea.rxandroidble.RxBleDevice;
import com.polidea.rxandroidble.RxBleDeviceServices;
import com.ths.bts.utils.Defs;
import com.ths.bts.utils.Utility;

import java.util.HashMap;
import java.util.List;
import java.util.UUID;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.subjects.PublishSubject;


public class RXClass {

    private String macAddress;
    private RxBleDevice rxBleDevice;
    private RxBleConnection connection;
    private rx.Observable<RxBleConnection> connectionObservable;
    private Subscription subscription;
    private BluetoothDevice device;
    private BleProfileManager bleProfileManager;
    private Handler mActionHandler = null;
    private PublishSubject<Void> disconnectTriggerSubject = null;
    private HashMap<UUID,BluetoothGattCharacteristic> charMap = new HashMap<>();
    public BleProfileManager getBleProfileManager() {
        return bleProfileManager;
    }

    public PublishSubject<Void> getDisconnectTriggerSubject() {
        return disconnectTriggerSubject;
    }

    public void setDisconnectTriggerSubject(PublishSubject<Void> disconnectTriggerSubject) {
        this.disconnectTriggerSubject = disconnectTriggerSubject;
    }

    public void setBleProfileManager(BleProfileManager bleProfileManager) {
        this.bleProfileManager = bleProfileManager;
    }

    public BluetoothDevice getDevice() {
        return device;
    }

    public void setDevice(BluetoothDevice device) {
        this.device = device;
    }

    public Subscription getSubscription() {
        return subscription;
    }

    public void setSubscription(Subscription subscription) {
        this.subscription = subscription;
    }

    public Observable<RxBleConnection> getConnectionObservable() {
        return connectionObservable;
    }

    public void setConnectionObservable(Observable<RxBleConnection> connectionObservable) {
        this.connectionObservable = connectionObservable;
    }

    public String getMacAddress() {
        return macAddress;
    }

    public void setMacAddress(String macAddress) {
        this.macAddress = macAddress;
    }

    public RxBleDevice getRxBleDevice() {
        return rxBleDevice;
    }

    public void setRxBleDevice(RxBleDevice rxBleDevice) {
        this.rxBleDevice = rxBleDevice;
    }

    public RxBleConnection getConnection() {
        return connection;
    }

    public void setConnection(RxBleConnection connection) {
        this.connection = connection;
    }

    public RXClass(String address)
    {
        this.macAddress = address;
        mActionHandler = new Handler(Looper.getMainLooper());
    }




    public byte[] getSecurityCommand(String address)
    {
        String second = address.substring(3, 5);
        String fifth = address.substring(12, 14);
        String command = fifth.trim() + "3663" + second.trim();
        byte b[] = hexStringToByteArray(command);

        return b;
    }

    public synchronized byte[] hexStringToByteArray(String s) {
        int len = s.length();
        byte[] data = new byte[(len / 2)];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));
        }
        return data;
    }

    public void readAllChars(List<BluetoothGattService> services)
    {
        for (BluetoothGattService service : services)
        {
            if(service.getCharacteristics().size() > 0 && !service.getCharacteristics().get(0).getUuid().equals(Defs.CHAR_ALERT_LEVEL_UUID)
                    && !service.getCharacteristics().get(0).getUuid().equals(Defs.CHAR_CUSTOM_SERVICE_UUID))
            {
                int props = service.getCharacteristics().get(0).getProperties();
                if ((props & 2) != 0) {
                    charMap.put(service.getCharacteristics().get(0).getUuid(), service.getCharacteristics().get(0));
                    readCharacterisitc(service.getCharacteristics().get(0).getUuid());
                }



            }

        }
    }

    public void connect()
    {

    }

    public void disconnect()
    {
        if(subscription != null) {
            subscription.unsubscribe();

            Utility.infoLog("NANI", "Disconnecting  - " + this.device.getAddress() + " - " + rxBleDevice.getConnectionState());
            if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
            {
                mActionHandler.post(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                            Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                            Bundle bundle = new Bundle();
                            bundle.putString("eventType", "ondisconnect");
                            Message msg = Message.obtain(handler, Defs.HDLR_MSG_DEVICE_DISCONNECT);
                            bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                            msg.setData(bundle);
                            msg.sendToTarget();
                        }
                    }
                });

            }
        }





    }


    public int getConnectionState()
    {
        if(this.rxBleDevice != null)
        {
            if(rxBleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED)
                return bleProfileManager.STATE_PROFILE_CONNECTED;
            if(rxBleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.DISCONNECTED)
                return bleProfileManager.STATE_PROFILE_DISCONNECTED;
            if(rxBleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTING)
                return bleProfileManager.STATE_PROFILE_STATE_CONNECTING;
            if(rxBleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.DISCONNECTING)
                return bleProfileManager.STATE_PROFILE_STATE_DISCONNECTING;
        }
        return bleProfileManager.STATE_PROFILE_DISCONNECTED;
    }

    public void unscribe()
    {
        if(subscription != null && subscription.isUnsubscribed())
            subscription.unsubscribe();


    }

    private void triggerDisconnect() {
        disconnectTriggerSubject.onNext(null);
    }


    public void readRemoteRssi() {
        connection.readRssi()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::onRSSIReadSuccess, this::onRSSIReadFailure);
    }

    public void readCharacterisitc(UUID iCharUuid)
    {
        connection.readCharacteristic(iCharUuid)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::onReadSuccess, this::onReadFailure);
    }

    public void writeCharacteristic(UUID iCharUuid,
                                    byte[] data)
    {
        connection.writeCharacteristic(iCharUuid, data)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::onWriteSuccess, this::onWriteFailure);
    }

    public void writeCharacteristic(UUID iCharUuid,
                                    int data)
    {
        byte b[] = new byte[1];
        b[0]  = (byte)data;
        connection.writeCharacteristic(iCharUuid, b)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::onWriteSuccess, this::onWriteFailure);
    }

    public void enableNotification(UUID iCharUuid)
    {
        connection.setupNotification(iCharUuid)
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(notificationObservable -> notificationObservable)
                .subscribe(this::onNotificationReceived, this::onNotificationSetupFailure);

    }

    public BluetoothGattCharacteristic getCharacteristic(UUID iCharUuid)
    {
        return charMap.get(iCharUuid);
    }




    // Call backs

    public void onConnectionFailure(Throwable throwable) {
        Utility.infoLog("NANI", "Connection Failed" + this.macAddress);

        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {
            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "ondisconnect");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_DEVICE_CONNECTION_FAILED);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });

        }

    }

    public void onConnectionReceived(RxBleConnection connection) {
        Utility.infoLog("NANI", "Connection Success - " + this.macAddress);
        this.connection = connection;
        connection.discoverServices().subscribe(this::onServiceDiscovered);
        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {
            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "onConnectionStateChange");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_DEVICE_CONNECT);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });

        }



    }


    public void onConnectionStateChange(RxBleConnection.RxBleConnectionState newState) {
        //Utility.infoLog("NANI", "Connection Status - " + newState);

    }

    public void onServiceDiscovered(RxBleDeviceServices services) {
        Utility.infoLog("NANI", "Services Success - " + this.macAddress);
        writeCharacteristic(Defs.CHAR_CUSTOM_CONNECTION_UUID, getSecurityCommand(this.macAddress));
        if(services != null)
        {
            readAllChars(services.getBluetoothGattServices());
        }


        if(charMap.get(Defs.CHAR_BIOSENSOR_NOTIFY_UUID) != null)
            readCharacterisitc(Defs.CHAR_BIOSENSOR_NOTIFY_UUID);

        if(charMap.get(Defs.CHAR_BIOSENSOR_NOTIFY_UUID) != null)
            readCharacterisitc(Defs.CHAR_BIOSENSOR_NOTIFY_UUID);

        if(charMap.get(Defs.CHAR_BIOSENSOR_CONNECTION_UUID) != null)
            writeCharacteristic(Defs.CHAR_BIOSENSOR_CONNECTION_UUID, getSecurityCommand(this.macAddress));

        if(charMap.get(Defs.CHAR_CUSTOM_CONNECTION_UUID) != null)
            writeCharacteristic(Defs.CHAR_CUSTOM_CONNECTION_UUID, getSecurityCommand(this.macAddress));

        if(charMap.get(Defs.CHAR_BIOSENSOR_NOTIFY_UUID) != null)
            enableNotification(Defs.CHAR_BIOSENSOR_NOTIFY_UUID);

        if(charMap.get(Defs.CHAR_AUTOMATION_IO) != null)
            enableNotification(Defs.CHAR_AUTOMATION_IO);

        try{ConnectionManager.resetWait(device.getAddress());}catch(Exception e){e.printStackTrace();}

        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {
            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "onServicesDiscovered");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_SERVICE_DISCOVERY_DONE);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });

        }
    }

    public void onReadFailure(Throwable throwable) {

    }

    public void onReadSuccess(byte[] b) {
        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {
            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "oncharread");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_GATT_ON_CHAR_READ);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });

        }
    }

    public void onWriteSuccess(byte[] b) {
        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {
            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "oncharwrite");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_GATT_ON_CHAR_WRITE);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });

        }
    }


    public void onNotificationReceived(byte[] bytes) {
        Utility.infoLog("NANI", "Noification Recevied" + this.macAddress);
        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {

            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "oncharwrite");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_DEVICE_CLIP_UPDATE);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });
        }
    }

    private void notificationHasBeenSetUp() {

    }

    public void onNotificationSetupFailure(Throwable throwable) {
        Utility.infoLog("NANI", "Noification Failed" + this.macAddress);

    }

    public void onRSSIReadSuccess(int rssi) {
        Defs.B_DEVICES_RSSI_MAP.put(this.macAddress, new Integer(rssi));
        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {

            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "oncharwrite");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_DEVICE_READ_SSI);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });
        }


    }
    public void onRSSIReadFailure(Throwable throwable) {

    }


    public void onWriteFailure(Throwable throwable) {

    }
}

Solution

  • I have skimmed through the code and I have found a connection.discoverServices().subscribe(this::onServiceDiscovered); without error handling in:

    public void onConnectionReceived(RxBleConnection connection) {
        Utility.infoLog("NANI", "Connection Success - " + this.macAddress);
        this.connection = connection;
        connection.discoverServices().subscribe(this::onServiceDiscovered);
        if(bleProfileManager != null && bleProfileManager.getRemoteHandlers() != null )
        {
            mActionHandler.post(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < bleProfileManager.getRemoteHandlers().size(); i++) {
                        Handler handler = bleProfileManager.getRemoteHandlers().get(i);
                        Bundle bundle = new Bundle();
                        bundle.putString("eventType", "onConnectionStateChange");
                        Message msg = Message.obtain(handler, Defs.HDLR_MSG_DEVICE_CONNECT);
                        bundle.putParcelable(BluetoothDevice.EXTRA_DEVICE, device);
                        msg.setData(bundle);
                        msg.sendToTarget();
                    }
                }
            });
        }
    }
    

    Additionally you have a mistake in:

    public void unscribe()
    {
        if(subscription != null && subscription.isUnsubscribed())
            subscription.unsubscribe();
    }
    

    It should be if(subscription != null && !subscription.isUnsubscribed()) because otherwise it will never unsubscribe.