Search code examples
javaspringspring-integrationmqttspring-integration-dsl

How can we have different handlers to subscribe messages from different topics using spring Integration?


I wrote my first spring integration application which uses the mqtt broker to subscribe messages from different topics which are coming from a device. The device is publishing the messages and the client(Code) is accessing those messages using same topics.

I added a handler for accessing the messages coming from the broker and use it further in classes. Now, in my case, I want to have different handlers for different topics so that they can all be mapped to different VO classes and use it further in business logic.

As I know, I want to create only one connection to the broker, one channel but different topics can come and they should be handled in different handlers for the same connection. How Can I achieve that?

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
//        SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
        SpringApplication.run(MqttJavaApplication.class,args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        String clientId = "uuid-" + UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
//        adapter.setOutputChannelName("mqttInputChannel");
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    public IntegrationFlow mqttInFlow() {
        System.out.println(Arrays.stream(SubScribeMessages.class.getMethods()).findFirst());
        return IntegrationFlows.from(inbound())
                .transform(p -> p)
                .handle("addTopics","handlHere")
                .get();
    }

    @Component
    public class MyService{

        @Autowired
        MqttPahoMessageDrivenChannelAdapter adapter;

        @Bean
        public String addTopics()
        {
            if(adapter.getTopic().length>0)
            {
                adapter.addTopic("camera/+/counts"); //new topic 
                adapter.addTopic("camera/+/live_counts"); //new topic
            }
            return "";
        }

        // topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public void handleHere(@Payload Object mess) throws JsonProcessingException {
            String[]  topics = adapter.getTopic();
            for(String topic:topics)
                System.out.println(topic); // How can I get topic name which is using a wildcard?
            ObjectMapper objectMapper = new ObjectMapper();
            String json=mess.toString();
            System.out.println(json);
            CountVo countVo = objectMapper.readValue(json, CountVo.class);
            if (!countVo.equals(null))
                System.out.println(countVo.getIrisysCounts().get(0).getName());

        }
    }

}

Additional Question

How Can I get the full topic name when using a wildcard? The actual topic which was published but caught by wildcard.

Please help.


Solution

  • Add a router (.route(...)); you can route on the MqttHeaders.RECEIVED_TOPIC header (which contains the topic name) to different flows for each topic.

    https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#messaging-routing-chapter

    EDIT

    The simplest router is to simply map the topic names to channel names. Here is an example:

    @SpringBootApplication
    public class So67391175Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67391175Application.class, args);
        }
    
        @Bean
        public DefaultMqttPahoClientFactory pahoClientFactory() {
            DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
            MqttConnectOptions connectionOptions = new MqttConnectOptions();
            connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
            pahoClientFactory.setConnectionOptions(connectionOptions);
            return pahoClientFactory;
        }
    
        @Bean
        public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
            return IntegrationFlows.from(
                    new MqttPahoMessageDrivenChannelAdapter("testClient",
                            pahoClientFactory, "topic1", "topic2"))
                    .route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
                    .get();
        }
    
        @Bean
        public IntegrationFlow flow1() {
            return IntegrationFlows.from("topic1")
                    .handle((payload, headers) -> {
                        System.out.println("message from topic1 " + payload + ": " + headers);
                        return null;
                    })
                    .get();
        }
    
        @Bean
        public IntegrationFlow flow2() {
            return IntegrationFlows.from("topic2")
                    .handle((payload, headers) -> {
                        System.out.println("message from topic2 " + payload + ": " + headers);
                        return null;
                    })
                    .get();
        }
    
    }
    

    message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}

    message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}