I would like to start with information that my MQTT solution has been working very good with previous org.eclipse.paho.android.service.MqttAndroidClient
but recently I decided to switch to ->> org.eclipse.paho.client.mqttv3.MqttAsyncClient
.
Since then following problems started:
messageArrived()
is not updated after notifyDataSetChanged()
is called in my adapter setup and when notifyDataSetChanged()
is called, error is received:android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.
Calling notifyDataSetChanged()
from Runnable:
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
//...
sonoff_devices_arrayList.add( sonoff_obj );
sonoff_adapter_rv.notifyDataSetChanged();
}
});
causing app to crash:
Attempt to read from field 'androidx.recyclerview.widget.ViewInfoStore androidx.recyclerview.widget.RecyclerView.mViewInfoStore' on a null object reference
As said when app is put to background and back to foreground UI surprisingly gets updated immediately after app is resumed. I have no idea why.
I am sure I am doing something wrong when trying to update UI same way as I was use to with MqttAndroidClient
My class mqtt_helper providing MqttAsyncClient basic connectivity:
package cz.removed;
import android.content.Context;
import android.util.Log;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
public class MQTThelper {
MqttConnectOptions MQTT_CONNECTION_OPTIONS;
PreferenceData preferenceData;
Context context;
public MQTThelper(Context context){
this.context = context;
preferenceData = new PreferenceData( context );
}
public MqttAsyncClient getMqttCLIENT(){
//MQTT connection stuff
MqttAsyncClient CLIENT;
String BROKER_HOST = preferenceData.getUsersStringData( "key_mqtt_server_host" ); //preferenceData gathered
String BROKER = "ssl://" + BROKER_HOST;
try {
CLIENT = new MqttAsyncClient(BROKER, MqttAsyncClient.generateClientId(), new MemoryPersistence());
} catch (MqttException e) {
throw new RuntimeException( e );
}
MQTT_CONNECTION_OPTIONS = new MqttConnectOptions();
MQTT_CONNECTION_OPTIONS.setCleanSession(false);
MQTT_CONNECTION_OPTIONS.setKeepAliveInterval(30);
//Optional login
String USERNAME = preferenceData.getUsersStringData( "key_mqtt_server_username" ); //preferenceData gathered
String PASSWORD = preferenceData.getUsersStringData( "key_mqtt_server_password" ); //preferenceData gathered
MQTT_CONNECTION_OPTIONS.setUserName(USERNAME);
MQTT_CONNECTION_OPTIONS.setPassword(PASSWORD.toCharArray());
MQTT_CONNECTION_OPTIONS.setCleanSession( false );
if (BROKER.contains("ssl")) {
SocketFactory.SocketFactoryOptions socketFactoryOptions = new SocketFactory.SocketFactoryOptions();
try {
socketFactoryOptions.withCaInputStream(context.getResources().openRawResource( R.raw.ca_root));
MQTT_CONNECTION_OPTIONS.setSocketFactory(new SocketFactory(socketFactoryOptions));
} catch (IOException | NoSuchAlgorithmException | KeyStoreException | CertificateException | KeyManagementException | UnrecoverableKeyException e) {
e.printStackTrace();
}
}
System.out.print("Allocated CLIENT in helpers -> getMqttCLIENT: " + CLIENT.getClientId());
return CLIENT;
}
public void MqttConnect( MqttAsyncClient CLIENT) {
try {
System.out.println("MQTT client id in helpers: " + CLIENT.getClientId());
final IMqttToken token = CLIENT.connect(MQTT_CONNECTION_OPTIONS);
token.waitForCompletion(3500);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
//connected
Log.d("MQTT:", "connected, token: " + asyncActionToken.toString());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// Something went wrong
Log.d("MQTT:", "not connected " + asyncActionToken.toString());
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
void MqttDisconnect(MqttAsyncClient CLIENT) {
if(CLIENT != null && CLIENT.isConnected() ) {
try {
IMqttToken disconToken = CLIENT.disconnect();
disconToken.setActionCallback( new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d( "mqtt:", "disconnected" );
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.d( "mqtt:", "couldnt disconnect" );
}
} );
} catch (MqttException e) {
e.printStackTrace();
}
}
}
public void subscribe(MqttAsyncClient CLIENT, String topic, byte qos) {
try {
IMqttToken subToken = CLIENT.subscribe(topic, qos);
subToken.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d("MQTT", "successfully subscribed: " + asyncActionToken.toString() + "topic: " + topic + ", with QOS:" + qos);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d("MQTT", "subscribing error to: " + topic);
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
public void unsubscribe(MqttAsyncClient CLIENT, String topic) {
//unsubscribe MQTT
if(CLIENT != null && CLIENT.isConnected()) {
try {
IMqttToken unsubToken = CLIENT.unsubscribe( topic );
unsubToken.setActionCallback( new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d( "mqtt:", "unsubcribed from " + topic );
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.d( "mqtt:", "couldnt unregister" );
}
} );
} catch (MqttException e) {
e.printStackTrace();
}
}
}
public void publish(MqttAsyncClient CLIENT,String topic,JSONObject jmsg,String msg,boolean isRetained,int qos) {
try {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(isRetained);
if(msg != null ){
message.setPayload(msg.getBytes());
}
else if (jmsg != null ){
message.setPayload(jmsg.toString().getBytes());
}
IMqttToken token = CLIENT.publish( topic, message );
token.waitForCompletion();
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d("mqtt:", "publish done -> " + asyncActionToken.toString());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d("mqtt:", "publish error -> " + asyncActionToken.toString());
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
Class using above MQTT helper:
package cz.removed;
import android.widget.Toast;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.appcompat.app.AlertDialog;
import androidx.core.content.ContextCompat;
import androidx.fragment.app.Fragment;
import androidx.recyclerview.widget.GridLayoutManager;
import androidx.recyclerview.widget.LinearLayoutManager;
import androidx.recyclerview.widget.RecyclerView;
import androidx.swiperefreshlayout.widget.SwipeRefreshLayout;
import androidx.viewpager2.widget.ViewPager2;
import com.google.android.material.snackbar.Snackbar;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
public class EFragment extends Fragment implements MqttCallbackExtended {
MqttAsyncClient CLIENT;
isMQTTEnabled = True;
@Override
public void onCreate (Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
CLIENT = mqtt_helper.getMqttCLIENT();
CLIENT.setCallback( this );
}
@Override
public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
//# Means subscribe to everything
TOPIC = new ArrayList();
TOPIC.add( "a/discovery/#" );
TOPIC.add( "b/#" );
@Override
public void onPause() {
super.onPause();
//Unsubscribe MQTT
if(isMQTTEnabled){
unsubscribeMQTT();
}
}
@Override
public void onDetach() {
super.onDetach();
}
@Override
public void onResume() {
super.onResume();
if( isMQTTEnabled ) {
try{
if ( CLIENT !=null && ! CLIENT.isConnected()){
mqtt_helper.MqttConnect(CLIENT);
}
else{
subscribeMQTT();
}
}
}
}
@Override
public void onStop() {
super.onStop();
if (isMQTTEnabled) {
mqtt_helper.MqttDisconnect(CLIENT);
}
}
public void unsubscribeMQTT() {
if (isMQTTEnabled) {
for (int i = 0; i < TOPIC.size(); i++) {
mqtt_helper.unsubscribe(CLIENT, TOPIC.get( i ).toString() );
}
}
}
public void subscribeMQTT() {
for (int i = 0; i < TOPIC.size(); i++) {
System.out.println("subscribing to " + TOPIC.get( i ).toString());
mqtt_helper.subscribe(CLIENT, TOPIC.get( i ).toString(), (byte) 0 );
}
}
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, final MqttMessage message) throws Exception {
Log.i( "MQTT_1: " + topic, "message:" + message.toString() );
try {
_obj = new JSONObject( message.toString());
//
// a lot of JSON adapter structure logic code ommited , but the logic works well for Synch MQTT client
//...
_devices_arrayList.add( _obj );
_adapter_rv.notifyDataSetChanged(); /
}
} catch (JSONException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("MQTT Message publish OK!");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("MQTT connectComplete");
subscribeMQTT();
}
public class myRecyclerViewAdapter extends RecyclerView.Adapter<myRecyclerViewAdapter.ViewHolder> {
private final ArrayList mData;
private final LayoutInflater mInflater;
private boolean SwitchFallbackValue;
private boolean SwitchToggleCanceled;
// data is passed into the constructor
myRecyclerViewAdapter(Context context, ArrayList<JSONObject> data) {
this.mInflater = LayoutInflater.from(context);
this.mData = data;
}
// inflates the cell layout from xml when needed
@Override
@NonNull
public ViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
View view = mInflater.inflate(R.layout.fragment_grid, parent, false);
return new ViewHolder(view);
}
// binds the data to the TextView in each cell
@Override
public void onBindViewHolder(@NonNull final ViewHolder holder, int position) {
try {
JSONObject _temp_json_object = new JSONObject( String.valueOf( mData.get( position ) ) );
//Example of updating UI in RecyclerViewAdapter
//RSSI as Wifi object
holder.tv_rssi.setText( _temp_json_object.getJSONObject( "Wifi" ).getString( "RSSI" ));
} catch (JSONException e) {
e.printStackTrace();
}
}
// total number of cells
@Override
public int getItemCount() {
if (mData == null){
return 0;
}
else{
return mData.size();
}
}
public class ViewHolder extends RecyclerView.ViewHolder {
TextView tv_rssi;
ViewHolder(View itemView) {
super(itemView);
tv_rssi = itemView.findViewById(R.id.grid_rssi);
}
}
@Override
public long getItemId(int position) {
return position;
}
@Override
public int getItemViewType(int position) {
return position;
}
// convenience method for getting data at click position
public Object getItem(int id) {
return id ;
}
}
}
After further debugging I see that all MQTT messages are arriving on-time and I can clearly see them correctly in my log, so this the issue is not related to MQTT client server communication, but something else.
any ideas?
Eventually what worked was to execute following adapter changes part on Runnable:
new Handler(getActivity().getMainLooper()).post(new Runnable() {
@Override
public void run() {
_adapter_rv.notifyItemChanged( i ); <- or also _adapter_rv.notifyDataSetChanged();
RecyclerView.setLayoutManager( new GridLayoutManager( context, columns ); <- setLayoutManager must be run on Runnable too.
}
});