We are using Spring
to subscribe MQTT
topic. But we are getting below error very frequently. I have tested connection using JavaScript client(mqttws31.js), which is working fine. Means there is no issue with connection.
Error :-
org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost
SEVERE: Lost connection:Connection lost; retrying...
MQTT message :-
[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0,
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}]
Configuration :-
<bean id="clientFactory"
<property name="userName" value="${mqtt.username}" />
<property name="password" value="${mqtt.password}" />
id="mqttInbound" client-id="${mqtt.default.client.id}" url="${mqtt.url}"
topics="${topics}" client-factory="clientFactory" auto-startup="true"
channel="output" error-channel="errorChannel" />
<int:channel id="output" />
<int:channel id="errorChannel" />
<int:service-activator input-channel="errorChannel"
ref="errorMessageLogger" method="logError" />
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" />
<int:service-activator input-channel="output"
method="handleMessage" ref="mqttLogger" />
<bean id="mqttLogger" class="com.mqtt.MqttReciever" />
pom.xml :
While debugging org.eclipse.paho.client.mqttv3-1.1.1-sources.jar
public void run() {
final String methodName = "run";
MqttToken token = null;
while (running && (in != null)) {
try {
//@TRACE 852=network read message
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
// instanceof checks if message is null
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
//This is an ack for a message we no longer have a ticket for.
//This probably means we already received this message and it's being send again
//because of timeouts, crashes, disconnects, restarts etc.
//It should be safe to ignore these unexpected messages.
log.fine(CLASS_NAME, methodName, "857");
} else {
// It its an ack and there is no token then something is not right.
// An ack should always have a token assoicated with it.
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
} else {
if (message != null) {
// A new message has arrived
catch (MqttException ex) {
//@TRACE 856=Stopping, MQttException
running = false;
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex);
catch (IOException ioe) {
//@TRACE 853=Stopping due to IOException
running = false;
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
finally {
receiving = false;
//@TRACE 854=<
In above method, sometime in.readMqttWireMessage()
throw IOException
. So based on catch block it reconnect using clientComms.shutdownConnection(token, ...
Just wanted to share in case it helps...
I had the same exception and fixed it by ensuring a unique client ID was generated (with MqttAsyncClient.generateClientId()
), as mentioned here: