Search code examples
springspring-integrationmqttspring-integration-mqtt

can we batch up groups of 10 message load in mosquitto using spring integration


this is how i have defined my mqtt connection using spring integration.i am not sure whether this is possible bt can we setup a mqtt subscriber works after getting a 10 load of messages. right now subscriber works after publishing a message as it should.

    @Autowired
    ConnectorConfig config;


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(config.getUrl());
        factory.setUserName(config.getUser());
        factory.setPassword(config.getPass());
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttRouterChannel());
        return adapter;
    }

   /**this is router**/
   @MessageEndpoint
   public class MessageRouter {

    private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);


    static final String  ALERT = "ALERT";
    static final String  READING = "READING";

    @Router(inputChannel = "mqttRouterChannel")
    public String route(@Header("mqtt_topic") String topic){
        String route = null;
        switch (topic){
            case ALERT:
                logger.info("alert message received");
                route = "alertTransformerChannel";
                break;
            case READING:
                logger.info("reading message received");
                route = "readingTransformerChannel";
                break;
        }
        return route;
    }
 }

Solution

  • i need to batch up groups of 10 messages at a time

    That is not a MqttPahoMessageDrivenChannelAdapter responsibility.

    We use there MqttCallback with this semantic:

     * @param topic name of the topic on the message was published to
     * @param message the actual message.
     * @throws Exception if a terminal error has occurred, and the client should be
     * shut down.
     */
    public void messageArrived(String topic, MqttMessage message) throws Exception;
    

    So, we can't batch them there on this Channel Adapter by nature of the Paho client.

    What we can suggest you from the Spring Integration perspective is an Aggregator EIP implementation.

    In your case you should add @ServiceActivator for the AggregatorFactoryBean @Bean before that mqttRouterChannel, before sending to the router.

    That maybe as simple as:

    @Bean
    @ServiceActivator(inputChannel = "mqttAggregatorChannel")
    AggregatorFactoryBean mqttAggregator() {
        AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
        aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
        aggregator.setCorrelationStrategy(m -> 1);
        aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
        aggregator.setExpireGroupsUponCompletion(true);
        aggregator.setSendPartialResultOnExpiry(true);
        aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
        aggregator.setOutputChannelName("mqttRouterChannel");
        return aggregator;
    }
    

    See more information in the Reference Manual.