Search code examples
javaspring-bootmqttspring-integration-mqtt

MQTT and SpringBoot Integration Connection Lost


I currently have an API in SpringBoot and I would like to add an MQTT client to subscribe to one or more topics. I tried several Paho,Hive clients, without success, I'm currently on the default MQTT of SpringBoot which uses Paho but I can't get it to work even with the basic configuration. I get a "Connection Lost" error as soon as I launch the application... Can you tell me a fix or something else that would work. Thanks! Mosquitto is running on windows for testing

Maven :

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>           
        </dependency>
....

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import lombok.extern.slf4j.Slf4j;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Slf4j
@SpringBootApplication
@EnableSwagger2
public class MainApiSpring {

    public static void main(String[] args) {

        SpringApplication.run(MainApiSpring.class, args);
        log.trace("L'application a correctement été démarrée.");

    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                "test/topic");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);

        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }
}

The error on run :

2020-09-04 10:31:39.099 ERROR 4244 --- [           main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying

org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.io.EOFException: null
    at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na]
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    ... 1 common frames omitted

More log


Solution

  • Answer : This works with MqttOptions defined !

        @Bean
        public MqttConnectOptions getReceiverMqttConnectOptions() {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(30);
            mqttConnectOptions.setKeepAliveInterval(60);
            mqttConnectOptions.setAutomaticReconnect(true);
    
    //      mqttConnectOptions.setUserName("myemail");
            String password = "mypassword!";
    //      String hostUrl = "tcp://maqiatto.com:1883";
            String hostUrl = "tcp://localhost:1883";
    //      mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(new String[] { hostUrl });
            return mqttConnectOptions;
        }
    
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getReceiverMqttConnectOptions());
            return factory;
        }
    
        @Bean
        public MessageProducer inbound() {
            String clientId2 = "uuid-" + UUID.randomUUID().toString();
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2,
    //              mqttClientFactory(), "myemail/test");
                    mqttClientFactory(), "test", "test/paho");
            adapter.setCompletionTimeout(20000);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(2);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }