Search code examples
springspring-integrationspring-integration-dslspring-integration-mqtt

Spring Integration + MQTT parallelization


I wrote a Spring application using Integration flows that reads some MQTT messages and puts them in incomingMqttMessageChannel:

  @Bean
  public IntegrationFlow incomingMqttMessageFlow() {
    return IntegrationFlows.from(mqttPahoMessageDrivenChannelAdapter())
        .channel("incomingMqttMessageChannel").get();
  }

  public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
        mqttBroker, UUID.randomUUID().toString(), incomingMqttTopic);
    //...
  }

  //...

And then I use some Spring Integration annotations to process the messages in incomingMqttMessageChannel, e.g.:

  @Transformer(inputChannel = "incomingMqttMessageChannel", outputChannel = "entityChannel")
  public Entity transform(byte[] mqttMessage){
    //transform mqtt message to other Entity
  }

I performed some tests and I realized that with this code messages were processed one by one.

I want to process the MQTT messages I receive in parallel using a thread pool, not running several Spring applications.

According to this the MqttPahoMessageDrivenChannelAdapter is single-threaded.

Is there any way to parallelize message processing in this case? Which are the options I have?

Thanks in advance.


Solution

  • Make your incomingMqttMessageChannel as an Executor channel:

    .channel(c -> c.executor("incomingMqttMessageChannel", threadPoolTaskExecutor))
    

    This way your MQTT messages are going to consumed from that channel from threads of that executor.

    See more info in docs: https://docs.spring.io/spring-integration/reference/channel/implementations.html#executor-channel