Search code examples
androidandroid-servicemqttpaho

Why MQTT service is blocking activity?


My app from MainActivity onCreate(), starts a service which connects to the server and subscribes to topics. Also when I add a new connection, my service restarts (using stop/startService). I store connection data (ip, port, etc.) in SQL database and pool it in service once it starts. The problem is (I think) that when one of connection parameters is incorrect the service is waiting for timeout and is blocking an activity... If I set token.waitForCompletion(500); it goes much faster but I can't guess that value...

Is there way to solve my problem?

@Override
public void onCreate() {

    Datapool();
    IntentFilter intentf = new IntentFilter();
    intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
    registerReceiver(mqttBroadcastReceiver, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
    mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    deviceId = String.format(DEVICE_ID_FORMAT, Settings.Secure.getString(getContentResolver(), Settings.Secure.ANDROID_ID));

}

MQTTBroadcastReceiver mqttBroadcastReceiver = new MQTTBroadcastReceiver();

class MQTTBroadcastReceiver extends BroadcastReceiver {
    @Override
    public void onReceive(Context context, Intent intent) {
        Connect();
    }

};

IMqttToken token;
int i = 0;
private HashMap<String, Boolean> _hashMap = new HashMap<>();
private void Connect(){
    for (ServiceDataModel connectionData : dataModels) {

        Log.d(TAG, "doConnect() " + connectionData.getCONNECTION_NAME());
        _hashMap.put(connectionData.getCONNECTION_NAME(), false);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        i++;
        try {
            mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
            token = mqttClient.connect();
            token.waitForCompletion(2500);
            if (mqttClient.isConnected()) {
                    mqttClient.setCallback(new MqttEventCallback());
                    token = mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()));
                    token.waitForCompletion(2500);
                   _hashMap.put(connectionData.getCONNECTION_NAME(), true);
            }
        }catch (Exception ex){
            Log.d(TAG, ex.toString() + connectionData.toString());
        }
    }
    sendMessageToActivity(_hashMap);
}

Solution

  • By calling token.waitForCompletion(2500) you are trying to synchronize first the connection and then the subscription - which blocks the main thread.

    Do not apply these hacks, but use the asynchronous connection callbacks (mCallback and mConnectionCallback below) instead. And after connection succeeds, use the asynchronous subscription callback (mSubscribeCallback below):

    private final MqttCallbackExtended mCallback = new MqttCallbackExtended() {
        @Override
        public void connectComplete(boolean reconnect, String brokerAddress) {
                mqttClient.subscribe(connectionData.getTOPIC(), Integer.parseInt(connectionData.getQOS()), null, mSubscribeCallback);
        }
    
        @Override
        public void connectionLost(Throwable ex) {
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        }
    };
    
    private final IMqttActionListener mConnectionCallback = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            // do nothing, this case is handled in mCallback.connectComplete()
        }
    
        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        }
    };
    
    private final IMqttActionListener mSubscribeCallback = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken subscribeToken) {
               _hashMap.put(connectionData.getCONNECTION_NAME(), true);
        }
    
        @Override
        public void onFailure(IMqttToken subscribeToken, Throwable ex) {
        }
    
    };
    
    try {
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setCleanSession(true);
        connectOptions.setAutomaticReconnect(false);
        connectOptions.setUserName("username");
        connectOptions.setPassword("password".toCharArray());
    
        mqttClient = new MqttAsyncClient("tcp://" + connectionData.getSERVER_IP() + ":" + connectionData.getSERVER_PORT(), deviceId + i , new MemoryPersistence());
        mqttClient.setCallback(mCallback);
        mqttClient.connect(connectOptions, null, mConnectionCallback);
    
    } catch (Exception ex) {
        Log.d(TAG, ex.toString() + connectionData.toString());
    }
    

    So rearrange your app a bit and it won't block.

    Also consider using LocalBroadcastManager to communicate between SQLite, MQTT and the Activity.

    UPDATE: Instead of LocalBroadcastManager use LiveData and Room