I tried the below app where two clients continously send a message and another one receives them. But in here, it seems that the messageArrived method is been called in a single thread. Because as soon as a message from one client gets stuck, the other clients messages are not getting received as well. Is this my implementation issue or is this the behavior? Please advice. Below shown is my code.
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class TestMain implements MqttCallback {
static TestMain t = new TestMain();
static MqttClient client;
static Timer timer = new Timer();
static Calendar cal = Calendar.getInstance();
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
static int count = 0;
public static void main(String[] args) {
int qos = 2;
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient("tcp://localhost:1883", "Receiving");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
client.connect();
client.setCallback(t);
client.subscribe(new String[] { "foo1", "foo2" });
ClientPub2 c = new ClientPub2();
c.sendMessage();
ClientPub1 c1 = new ClientPub1();
c1.sendMessage();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (topic.equalsIgnoreCase("foo1")) {
while (true) {
int i = 0;
}
// System.out.println("Received Message: foo1 :" + message);
} else if (topic.equalsIgnoreCase("foo2")) {
System.out.println("Received message: foo2 : " + message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
Another client sending messages:
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class ClientPub2 implements MqttCallback {
MqttClient client;
int qos = 2;
Timer timer = new Timer();
public static int sendCount = 0;
public ClientPub2() {
try {
client = new MqttClient("tcp://localhost:1883", "Sending");
} catch (MqttException e1) {
e1.printStackTrace();
}
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
try {
client.connect();
client.setCallback(this);
client.subscribe("foo2");
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void sendMessage() {
MqttMessage message = new MqttMessage();
message.setPayload("A single message from my computer fff".getBytes());
message.setQos(qos);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sendCount++;
System.out.println("foo2 : Sending message A single message from my computer fff " + sendCount);
message.setPayload(new String("A single message from my computer fff " + sendCount).getBytes());
try {
client.publish("foo2", message);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1000);
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
Another client sending messages :
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class ClientPub1 implements MqttCallback {
MqttClient client;
int qos = 2;
Timer timer = new Timer();
public static int sendCount = 0;
public ClientPub1() {
try {
client = new MqttClient("tcp://localhost:1883", "Sending2");
} catch (MqttException e1) {
e1.printStackTrace();
}
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
try {
client.connect();
client.setCallback(this);
client.subscribe("foo1");
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable arg0) {
}
public void sendMessage() {
MqttMessage message = new MqttMessage();
message.setPayload("A single message from my computer fff".getBytes());
message.setQos(qos);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sendCount++;
System.out.println("foo1 : Sending message A single message from my computer fff " + sendCount);
message.setPayload(new String("A single message from my computer fff " + sendCount).getBytes());
try {
client.publish("foo1", message);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1000);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
You are correct, the MQTT library runs a single thread to deliver incoming messages.
If you want to handle the incoming messages in parallel then you will need to implement your own local queue and a thread pool. The java.util package has tools to build things like this. Have a look at the java.util.concurent.ThreadPoolExecutor class as a good starting point