I wrote my first spring integration application which uses the mqtt broker to subscribe messages from different topics which are coming from a device. The device is publishing the messages and the client(Code) is accessing those messages using same topics.
I added a handler for accessing the messages coming from the broker and use it further in classes. Now, in my case, I want to have different handlers for different topics so that they can all be mapped to different VO classes and use it further in business logic.
As I know, I want to create only one connection to the broker, one channel but different topics can come and they should be handled in different handlers for the same connection. How Can I achieve that?
public class MqttJavaApplication {
public static void main(String[] args) {
// SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
public MessageChannel mqttInputChannel() {
return new DirectChannel();
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setOutputChannelName("mqttInputChannel");
return adapter;
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(inbound())
.transform(p -> p)
public class MyService{
MqttPahoMessageDrivenChannelAdapter adapter;
public String addTopics()
adapter.addTopic("camera/+/counts"); //new topic
adapter.addTopic("camera/+/live_counts"); //new topic
return "";
// topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleHere(@Payload Object mess) throws JsonProcessingException {
String[] topics = adapter.getTopic();
for(String topic:topics)
System.out.println(topic); // How can I get topic name which is using a wildcard?
ObjectMapper objectMapper = new ObjectMapper();
String json=mess.toString();
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
Additional Question
How Can I get the full topic name when using a wildcard? The actual topic which was published but caught by wildcard.
Please help.
Add a router (.route(...)
); you can route on the MqttHeaders.RECEIVED_TOPIC
header (which contains the topic name) to different flows for each topic.
The simplest router is to simply map the topic names to channel names. Here is an example:
public class So67391175Application {
public static void main(String[] args) {
SpringApplication.run(So67391175Application.class, args);
public DefaultMqttPahoClientFactory pahoClientFactory() {
DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
return pahoClientFactory;
public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
return IntegrationFlows.from(
new MqttPahoMessageDrivenChannelAdapter("testClient",
pahoClientFactory, "topic1", "topic2"))
.route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
public IntegrationFlow flow1() {
return IntegrationFlows.from("topic1")
.handle((payload, headers) -> {
System.out.println("message from topic1 " + payload + ": " + headers);
return null;
public IntegrationFlow flow2() {
return IntegrationFlows.from("topic2")
.handle((payload, headers) -> {
System.out.println("message from topic2 " + payload + ": " + headers);
return null;
message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}
message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}