Search code examples
androidandroid-recyclerviewmqtt

How to correctly populate MQTT data from MqttAsyncClient to app UI ? (RecyclerView , SnackBar issues)


I would like to start with information that my MQTT solution has been working very good with previous org.eclipse.paho.android.service.MqttAndroidClient but recently I decided to switch to ->> org.eclipse.paho.client.mqttv3.MqttAsyncClient.

Since then following problems started:

  • RecyclerView that uses MQTT data collected at messageArrived() is not updated after notifyDataSetChanged() is called in my adapter setup and when notifyDataSetChanged() is called, error is received:

android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.

Calling notifyDataSetChanged() from Runnable:

    getActivity().runOnUiThread(new Runnable() {
    @Override
    public void run() {
        //...
        sonoff_devices_arrayList.add( sonoff_obj );
        sonoff_adapter_rv.notifyDataSetChanged();
        }   
    });

causing app to crash:

Attempt to read from field 'androidx.recyclerview.widget.ViewInfoStore androidx.recyclerview.widget.RecyclerView.mViewInfoStore' on a null object reference

  • MQTT message triggered SnackBar is not showed on the UI (but after app is put background and foreground, it is)
  • Some apparently unrelated minor UI changes are not propagated as single navigation-drawer remains highlighted unexpectedly

As said when app is put to background and back to foreground UI surprisingly gets updated immediately after app is resumed. I have no idea why.

I am sure I am doing something wrong when trying to update UI same way as I was use to with MqttAndroidClient

My class mqtt_helper providing MqttAsyncClient basic connectivity:

package cz.removed;

import android.content.Context;
import android.util.Log;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;

public class MQTThelper  {

    MqttConnectOptions MQTT_CONNECTION_OPTIONS;
    PreferenceData preferenceData;
    Context context;

    public MQTThelper(Context context){
        this.context = context;
        preferenceData =  new PreferenceData( context );
    }

    public MqttAsyncClient getMqttCLIENT(){
        //MQTT connection stuff
        MqttAsyncClient CLIENT;
        String BROKER_HOST = preferenceData.getUsersStringData(  "key_mqtt_server_host" ); //preferenceData gathered
        String BROKER = "ssl://" + BROKER_HOST;

        try {
            CLIENT = new MqttAsyncClient(BROKER, MqttAsyncClient.generateClientId(), new MemoryPersistence());
        } catch (MqttException e) {
            throw new RuntimeException( e );
        }
        MQTT_CONNECTION_OPTIONS = new MqttConnectOptions();
        MQTT_CONNECTION_OPTIONS.setCleanSession(false);
        MQTT_CONNECTION_OPTIONS.setKeepAliveInterval(30);

        //Optional login
        String USERNAME = preferenceData.getUsersStringData( "key_mqtt_server_username" ); //preferenceData gathered
        String PASSWORD = preferenceData.getUsersStringData(  "key_mqtt_server_password" ); //preferenceData gathered

        MQTT_CONNECTION_OPTIONS.setUserName(USERNAME);
        MQTT_CONNECTION_OPTIONS.setPassword(PASSWORD.toCharArray());
        MQTT_CONNECTION_OPTIONS.setCleanSession( false );


        if (BROKER.contains("ssl")) {
            SocketFactory.SocketFactoryOptions socketFactoryOptions = new SocketFactory.SocketFactoryOptions();
            try {
                socketFactoryOptions.withCaInputStream(context.getResources().openRawResource( R.raw.ca_root));
                MQTT_CONNECTION_OPTIONS.setSocketFactory(new SocketFactory(socketFactoryOptions));
            } catch (IOException | NoSuchAlgorithmException | KeyStoreException | CertificateException | KeyManagementException | UnrecoverableKeyException e) {
                e.printStackTrace();
            }
        }
        System.out.print("Allocated CLIENT in helpers -> getMqttCLIENT: " + CLIENT.getClientId());
        return CLIENT;
    }

    public void MqttConnect( MqttAsyncClient CLIENT) {

        try {
            System.out.println("MQTT client id in helpers: " + CLIENT.getClientId());
            final IMqttToken token = CLIENT.connect(MQTT_CONNECTION_OPTIONS);
            token.waitForCompletion(3500);
            token.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    //connected
                    Log.d("MQTT:", "connected, token: " + asyncActionToken.toString());
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    // Something went wrong
                    Log.d("MQTT:", "not connected " + asyncActionToken.toString());
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    void MqttDisconnect(MqttAsyncClient CLIENT) {

        if(CLIENT != null && CLIENT.isConnected() ) {
            try {
                IMqttToken disconToken = CLIENT.disconnect();
                disconToken.setActionCallback( new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        Log.d( "mqtt:", "disconnected" );
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken,
                                          Throwable exception) {


                        Log.d( "mqtt:", "couldnt disconnect" );
                    }
                } );
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    public void subscribe(MqttAsyncClient CLIENT, String topic, byte qos) {

        try {
            IMqttToken subToken = CLIENT.subscribe(topic, qos);
            subToken.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d("MQTT", "successfully subscribed: " + asyncActionToken.toString() + "topic: " + topic + ", with QOS:" + qos);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.d("MQTT", "subscribing error to: " + topic);
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void unsubscribe(MqttAsyncClient CLIENT, String topic) {

        //unsubscribe MQTT
        if(CLIENT != null && CLIENT.isConnected()) {

            try {
                IMqttToken unsubToken = CLIENT.unsubscribe( topic );
                unsubToken.setActionCallback( new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {

                        Log.d( "mqtt:", "unsubcribed from " + topic );
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken,
                                          Throwable exception) {

                        Log.d( "mqtt:", "couldnt unregister" );
                    }
                } );
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    public void publish(MqttAsyncClient CLIENT,String topic,JSONObject jmsg,String msg,boolean isRetained,int qos) {

        try {
            MqttMessage message = new MqttMessage();
            message.setQos(qos);
            message.setRetained(isRetained);
            if(msg !=  null ){
                message.setPayload(msg.getBytes());
            }
            else if (jmsg != null ){
                message.setPayload(jmsg.toString().getBytes());
            }
            IMqttToken token = CLIENT.publish( topic, message );
            token.waitForCompletion();

            token.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d("mqtt:", "publish done -> " + asyncActionToken.toString());
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.d("mqtt:", "publish error -> " + asyncActionToken.toString());
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

Class using above MQTT helper:

package cz.removed;
import android.widget.Toast;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.appcompat.app.AlertDialog;
import androidx.core.content.ContextCompat;
import androidx.fragment.app.Fragment;
import androidx.recyclerview.widget.GridLayoutManager;
import androidx.recyclerview.widget.LinearLayoutManager;
import androidx.recyclerview.widget.RecyclerView;
import androidx.swiperefreshlayout.widget.SwipeRefreshLayout;
import androidx.viewpager2.widget.ViewPager2;
import com.google.android.material.snackbar.Snackbar;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

public class EFragment extends Fragment implements MqttCallbackExtended {

    MqttAsyncClient CLIENT;
    isMQTTEnabled = True;

    @Override
    public void onCreate (Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        CLIENT = mqtt_helper.getMqttCLIENT();
        CLIENT.setCallback( this );
    }

    @Override
    public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {

        super.onViewCreated(view, savedInstanceState);


        //# Means subscribe to everything
        TOPIC = new ArrayList();
        TOPIC.add( "a/discovery/#" ); 
        TOPIC.add( "b/#" );

    @Override
    public void onPause() {
        super.onPause();
        //Unsubscribe MQTT
        if(isMQTTEnabled){
            unsubscribeMQTT();
        }
    }

    @Override
    public void onDetach() {
        super.onDetach();
    }

    @Override
    public void onResume() {

        super.onResume();

        if( isMQTTEnabled ) {
            try{
                if ( CLIENT !=null && ! CLIENT.isConnected()){
                    mqtt_helper.MqttConnect(CLIENT);
                }
                else{
                    subscribeMQTT();
                }
            }
        }
    }

    @Override
    public void onStop() {
        super.onStop();
        if (isMQTTEnabled) {
            mqtt_helper.MqttDisconnect(CLIENT);
        }
    }

    public void unsubscribeMQTT() {
        if (isMQTTEnabled) {
            for (int i = 0; i < TOPIC.size(); i++) {
                mqtt_helper.unsubscribe(CLIENT, TOPIC.get( i ).toString() );
            }
        }
    }
    public void subscribeMQTT() {
        for (int i = 0; i < TOPIC.size(); i++) {
            System.out.println("subscribing to " + TOPIC.get( i ).toString());
            mqtt_helper.subscribe(CLIENT, TOPIC.get( i ).toString(), (byte) 0 );
        }
    }

    @Override
    public void connectionLost(Throwable cause) {

    }

    @Override
    public void messageArrived(String topic, final MqttMessage message) throws Exception {
        Log.i( "MQTT_1: " + topic, "message:" + message.toString() );

        try {
            _obj = new JSONObject( message.toString());            
            //
            // a lot of JSON adapter structure logic code ommited , but the logic works well for Synch MQTT client
            //...
            
            _devices_arrayList.add( _obj );
            _adapter_rv.notifyDataSetChanged(); /
            }
        } catch (JSONException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("MQTT Message publish OK!");
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        System.out.println("MQTT connectComplete");
        subscribeMQTT();
    }

    public class myRecyclerViewAdapter extends RecyclerView.Adapter<myRecyclerViewAdapter.ViewHolder> {

        private final ArrayList mData;
        private final LayoutInflater mInflater;
        private boolean SwitchFallbackValue;
        private boolean SwitchToggleCanceled;


        // data is passed into the constructor
        myRecyclerViewAdapter(Context context, ArrayList<JSONObject> data) {
            this.mInflater = LayoutInflater.from(context);
            this.mData = data;
        }

        // inflates the cell layout from xml when needed
        @Override
        @NonNull
        public ViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
            View view = mInflater.inflate(R.layout.fragment_grid, parent, false);

            return new ViewHolder(view);
        }

        // binds the data to the TextView in each cell
        @Override
        public void onBindViewHolder(@NonNull final ViewHolder holder, int position) {

            try {
                JSONObject _temp_json_object = new JSONObject( String.valueOf( mData.get( position ) ) );

                //Example of updating UI in RecyclerViewAdapter
                //RSSI as Wifi object
                holder.tv_rssi.setText( _temp_json_object.getJSONObject( "Wifi" ).getString( "RSSI" ));

            } catch (JSONException e) {
                e.printStackTrace();
                }
        }

        // total number of cells
        @Override
        public int getItemCount() {
            if (mData == null){
                return 0;
            }
            else{
                return  mData.size();
            }
        }

        public class ViewHolder extends RecyclerView.ViewHolder {
            TextView tv_rssi;
            ViewHolder(View itemView) {
                super(itemView);
                tv_rssi = itemView.findViewById(R.id.grid_rssi);
            }
        }

        @Override
        public long getItemId(int position) {
            return position;
        }

        @Override
        public int getItemViewType(int position) {
            return position;
        }

        // convenience method for getting data at click position
        public Object getItem(int id) {
            return id ;
        }

    }
}

After further debugging I see that all MQTT messages are arriving on-time and I can clearly see them correctly in my log, so this the issue is not related to MQTT client server communication, but something else.

any ideas?


Solution

  • Eventually what worked was to execute following adapter changes part on Runnable:

    new Handler(getActivity().getMainLooper()).post(new Runnable() {
      @Override
      public void run() {
        _adapter_rv.notifyItemChanged( i ); <- or also _adapter_rv.notifyDataSetChanged();
        RecyclerView.setLayoutManager( new GridLayoutManager( context, columns ); <- setLayoutManager must be run on Runnable too.
        }
      });