Search code examples
javaspring-integrationmqttpaho

How to customize MqttSubscription in Spring Integration?


I'm using org.eclipse.paho.mqttv5.client in Spring Integration and trying to set no local option in mqtt, like this:

    @Bean
    public MessageProducer inbound(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
        Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
                clientManager,
                "test"
        );
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        adapter.connectComplete(true);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

but Mqttv5PahoMessageDrivenChannelAdapter has no method to set MqttSubscription (which has configuration of mqtt's no-local)

In Mqttv5PahoMessageDrivenChannelAdapter class, it has a method subscribe:

    private void subscribe() {
        var clientManager = getClientManager();
        if (clientManager != null && this.mqttClient == null) {
            this.mqttClient = clientManager.getClient();
        }

        String[] topics = getTopic();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        this.topicLock.lock();
        try {
            if (topics.length == 0) {
                return;
            }

            int[] requestedQos = getQos();
            MqttSubscription[] subscriptions = IntStream.range(0, topics.length)
                    .mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i]))
                    .toArray(MqttSubscription[]::new);
            IMqttMessageListener listener = this::messageArrived;
            IMqttMessageListener[] listeners = IntStream.range(0, topics.length)
                    .mapToObj(t -> listener)
                    .toArray(IMqttMessageListener[]::new);
            this.mqttClient.subscribe(subscriptions, null, null, listeners, null)
                    .waitForCompletion(getCompletionTimeout());
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            logger.debug(message);
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
            }
        }
        catch (MqttException ex) {
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
            }
            logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
        }
        finally {
            this.topicLock.unlock();
        }
    }

but it create MqttSubscription only with params topic and qos: MqttSubscription[] subscriptions = IntStream.range(0, topics.length).mapToObj(i -> new MqttSubscription(topics[i], requestedQos[i])).toArray(MqttSubscription[]::new);


Solution

  • This is something we have missed when we introduced MQTT v5 support.

    It looks like we have to introduce something like MqttSubscription-based constructors as an alternative option to the plain topic and its qos. This way you will be able to fine-grain configuration for each subscription.

    Please, raise a GH issue and we will address it for the next Spring Integration version.

    As a workaround I only can suggestion to use Paho API directly. The custom MessageProducerSupport impl can be used to wire it with the rest of integration flow in your project.