Search code examples
mqttpahokapua

Eclipse Kapua broker : Not authorized to subscribe to topic


I'm trying to subscribe to a simple topic "foo" from an Eclipse Paho MQTT client.

The broker is managed by Eclipse Kapua and accessible via tcp://localhost:1883 with credentials "kapua-broker" and "kapua-password".

I'm publishing a value this way:

send(new Payload.Builder().put("testKey","testVal"),"foo");

This basically sends a map ("testKey","testVal") with topic "foo". To subscribe to this topic, I have the following code (host="localhost", port=1883):

    String topic = "foo";
    String broker = "tcp://"+host+":"+Integer.toString(port);
    String clientId = "supply-chain-control-simulation-listener";
    String username = "kapua-broker";
    String password = "kapua-password";

    try {
        MqttClient client = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(username);
        connOpts.setPassword(password.toCharArray());
        connOpts.setCleanSession(true);
        logger.info("Connecting to broker: "+broker);
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                logger.info("Subscriptions stopped");
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                logger.info(s);
                logger.info(mqttMessage.getPayload().toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        client.connect(connOpts);
        if (client.isConnected())
            logger.info("Connected");
        else
            logger.error(client.getDebug().toString());
        client.subscribe(topic,2);
    } catch(MqttException me) {
        logger.error("reason "+me.getReasonCode());
        logger.error("msg "+me.getMessage());
        logger.error("loc "+me.getLocalizedMessage());
        logger.error("cause "+me.getCause());
        logger.error("excep "+me);
        me.printStackTrace();
    }

The connection works, but the subscription outputs this error:

15:40:03.240 [ActiveMQ NIO Worker 0] WARN o.e.k.b.c.p.KapuaSecurityBrokerFilter - User 1:kapua-broker (supply-chain-control-simulation-listener - tcp://172.17.0.1:40888 - conn id 1734706196170193882) is not authorized to read from: topic://VirtualTopic.foo


Solution

  • In Kapua you are allowed to publish/subscribe according to your user permission.

    If your user has only broker:connect permission you can publish/subscribe only on topic:

    {account-name}/{connectionClientId}/{semanticTopic}
    

    In your specific case you should publish/subscribe on topic:

    kapus-sys/supply-chain-control-simulation-listener/foo
    

    kapua-sys is the account name to which the user kapua-broker belongs, while supply-chain-control-simulation-listener is the clientId used to create the connection.

    Please note that username used to connect and account name are two different things in Kapua. An account has multiple users.