Search code examples
rabbitmqmqttlibmosquitto

why libmosquitto close the connection after publish the first message to rabbitmq using it as mqtt broker?


I am doing this in windows with WSL, i don't know if its related
I create a simple broker with docker compose

version: "3.9"

services:
  # Create service with RabbitMQ.
  broker:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "1883:1883" # For MQTT
      - "8080:15672" # for serve RabbitMQ GUI
    volumes:
      - ./rabbitmq/:/etc/rabbitmq/:ro

in the folder ./rabbitmq/ a basic config
rabbitmq.conf

log.file.level = debug
loopback_users.guest = false
management.load_definitions = /etc/rabbitmq/definitions.json

# MQTT config
mqtt.vhost    = /
mqtt.allow_anonymous = false

definitions.json

{
    "users": [
        {
        "name": "guest",
        "password": "guest",
        "tags": "administrator"
        },
        {
            "name": "mqtt",
            "password": "mqtt",
            "tags": "management"
        }
    ],
    "vhosts": [
        {
        "name": "/"
        }
    ],
    "permissions": [
        {
            "user": "mqtt",
            "vhost": "/",
            "configure": ".*",
            "write": ".*",
            "read": ".*"
        }
    ],
    "queues": [
        {
        "name": "mqtt",
        "vhost": "/",
        "durable": true,
        "auto_delete": false,
        "arguments": {}
        }
    ]
}

enabled_plugins

[rabbitmq_federation_management,rabbitmq_management,rabbitmq_mqtt].

inside other container i add the library libmosquitto and I compile a small example taken from the mosquitto repo

/*
 * This example shows how to publish messages from outside of the Mosquitto network loop.
 */

#include <mosquitto.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>


/* Callback called when the client receives a CONNACK message from the broker. */
void on_connect(struct mosquitto *mosq, void *obj, int reason_code)
{
    printf("on_connect: %s\n", mosquitto_connack_string(reason_code));
    if(reason_code != 0){
        mosquitto_disconnect(mosq);
    }

    /* You may wish to set a flag here to indicate to your application that the
     * client is now connected. */
}

void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
    printf("Message with mid %d has been published.\n", mid);
}


int get_temperature(void)
{
    sleep(1); /* Prevent a storm of messages - this pretend sensor works at 1Hz */
    return random()%100;
}

/* This function pretends to read some data from a sensor and publish it.*/
void publish_sensor_data(struct mosquitto *mosq)
{
    char payload[20];
    int temp;
    int rc;

    /* Get our pretend data */
    temp = get_temperature();
    /* Print it to a string for easy human reading - payload format is highly
     * application dependent. */
    snprintf(payload, sizeof(payload), "%d", temp);

    rc = mosquitto_publish(mosq, NULL, "example/temperature", strlen(payload), payload, 2, false);
    if(rc != MOSQ_ERR_SUCCESS){
        fprintf(stderr, "Error publishing: %s\n", mosquitto_strerror(rc));
    }
}


int main(int argc, char *argv[])
{
    struct mosquitto *mosq;
    int rc;

    /* Required before calling other mosquitto functions */
    mosquitto_lib_init();

    mosq = mosquitto_new(NULL, true, NULL);
    if(mosq == NULL){
        fprintf(stderr, "Error: Out of memory.\n");
        return 1;
    }

        // Add user name defined in conf
        mosquitto_username_pw_set(mosq, "mqtt", "mqtt");
    /* Configure callbacks. This should be done before connecting ideally. */
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_publish_callback_set(mosq, on_publish);

    rc = mosquitto_connect(mosq, "host.docker.internal", 1883, 60);
    if(rc != MOSQ_ERR_SUCCESS){
        mosquitto_destroy(mosq);
        fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
        return 1;
    }

    /* Run the network loop in a background thread, this call returns quickly. */
    rc = mosquitto_loop_start(mosq);
    if(rc != MOSQ_ERR_SUCCESS){
        mosquitto_destroy(mosq);
        fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
        return 1;
    }

    while(1){
        publish_sensor_data(mosq);
    }

    mosquitto_lib_cleanup();
    return 0;
}

the program output is:

root@a6c0bc2147da:/src# ./a.out
on_connect: Connection Accepted.
[... nothing more appear ...]                                                                                                                                                                                                                                                                                                                                                                                         

And the log of rabbitmq is:

2023-03-14 20:14:39.346965+00:00 [debug] <0.1589.0> MQTT accepting TCP connection <0.1589.0> (172.22.0.1:57814 -> 172.22.0.2:1883)
2023-03-14 20:14:39.347458+00:00 [debug] <0.1589.0> Received a CONNECT, client ID: "mosq-hdlnBrkmJ6xRctY5t3" (expanded to "mosq-hdlnBrkmJ6xRctY5t3"), username: "mqtt", clean session: true, protocol version: 4, keepalive: 60
2023-03-14 20:14:39.347658+00:00 [debug] <0.1589.0> MQTT vhost picked using plugin configuration or default
2023-03-14 20:14:39.348422+00:00 [debug] <0.1593.0> User 'mqtt' authenticated successfully by backend rabbit_auth_backend_internal
2023-03-14 20:14:39.351230+00:00 [info] <0.1589.0> accepting MQTT connection <0.1589.0> (172.22.0.1:57814 -> 172.22.0.2:1883, client id: mosq-hdlnBrkmJ6xRctY5t3)
2023-03-14 20:14:40.351415+00:00 [info] <0.1589.0> MQTT connection "172.22.0.1:57814 -> 172.22.0.2:1883" will terminate because peer closed TCP connection
2023-03-14 20:14:40.351534+00:00 [debug] <0.1589.0> MQTT: about to send will message (if any) on connection "172.22.0.1:57814 -> 172.22.0.2:1883"
2023-03-14 20:14:40.352099+00:00 [debug] <0.1635.0> Closing all channels from connection '172.22.0.1:57814 -> 172.22.0.2:1883' because it has been closed

Closing all channels from connection '172.22.0.1:57814 -> 172.22.0.2:1883' because it has been closed
If i change the ip from host.docker.internal -> test.mosquitto.org work fine and the publish callback is called every second
I already try multiple configurations of mosquitto_connect(_async too) and mosquitto_loop(_start too) and keepalives an timeouts.
I don't understand why that error happen, can someone help me?


Solution

  • i found this in the documentation of rabbitmq mqtt:

    RabbitMQ does not support QoS2 subscriptions. RabbitMQ automatically downgrades QoS 2 publishes and subscribes to QoS 1. Messages published as QoS 2 will be sent to subscribers as QoS 1.
    Subscriptions with QoS 2 will be downgraded to QoS1 during SUBSCRIBE request (SUBACK responses will contain the actually provided QoS level).
    

    For make it work properly you need publish the messages to rabbitmq with qos 1, i am not sure if this is a bug or a feature