Search code examples
pythonsasleclipse-honoqpid-proton

Connect to Eclipse Hono AMQP Adaptor using python proton


I'm currently trying to send a telemetry message via AMPQ Adaptor to the Hono Sandbox. All though i took over parts of the code sample seen in Hono Noth bridge example (which should work for the south bridge as well) I struggle a bit with the SASL as it seems.

Here is my code

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'


class AmqpMessageSender(MessagingHandler):
    def __init__(self, server, address):
        super(AmqpMessageSender, self).__init__()
        self.server = server
        self.address = address

    def on_start(self, event):
        conn = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )    
        event.container.create_sender(conn, self.address)

    def on_sendable(self, event):   
        msg = Message(
            address=f'{self.address}/{deviceId}',
            content_type='application/json',
            body={"temp": 5, "transport": "amqp"}
        )
        event.sender.send(self.msg)
        event.sender.close()

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")


Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()

If I run the code, I get a transport error with the context condition

'Expected SASL protocol header: no protocol header found (connection aborted)'

I tried also with port 5672 which got me a link error and using port 15672 (which actually is the north bridge port) which - to my surprise, didn't cause a SASL error but got me the expected "not authorized" error (as the device is not allowed to connect via the north bridge)

======= update=======

once more thank you for you time.

regarding a) since comments are rather limited here once agian the code as a answer to question. The code i use to simulate the device is as follwoing

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'


class AmqpMessageSender(MessagingHandler):
    def __init__(self, server):
        super(AmqpMessageSender, self).__init__()
        self.server = server

    def on_start(self, event):
        print("In start")
        conn = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )
        print("connection established")
        event.container.create_sender(context=conn, target=None)
        print("sender created")

    def on_sendable(self, event):
        print("In Msg send")
        event.sender.send(Message(
            address=f'telemetry',
            properties={
                'to': 'telemetry',
                'content-type': 'application/json'
            },
            content_type='application/json',
            body={"temp": 5, "transport": "amqp"}
        )) 
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()

To simulate a server I do not use the java client, but use the sample code from the python quick start example as well. I have also a client class that does the http call as in the python quick start example an to that the server class reacts and prints the message - so the server implementation as outlined below should be ok from my understanding:

from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container

amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'


class AmqpReceiver(MessagingHandler):
    def __init__(self, server, address, name):
        super(AmqpReceiver, self).__init__()
        self.server = server
        self.address = address
        self._name = name

    def on_start(self, event):
        conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
        event.container.create_receiver(conn, self.address)

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_message(self, event):
        print(self._name)
        print("Got a message:")
        print(event.message.body)


class CentralServer:
    def listen_telemetry(self, name):
        uri = f'amqp://{amqpNetworkIp}:15672'
        address = f'telemetry/{tenantId}'
        self.container = Container(AmqpReceiver(uri, address, name))

        print("Starting (northbound) AMQP Connection...")
        self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
        self.thread.start()
        time.sleep(2)

    def stop(self):
        # Stop container
        print("Stopping (northbound) AMQP Connection...")
        self.container.stop()
        self.thread.join(timeout=5)


CentralServer().listen_telemetry('cs1')

after another day trying i couldn't find what i do wrong i really hope you see where i miss something :)

br Armin


Solution

  • The AMQP protocol adapter requires devices to send messages via an anonymous terminus.

    In your code, this means that the on_start method needs to be changed to contain event.container.create_sender(context=conn, target=None).

    In any case, the non-TLS port of the AMQP adapter is 5672, so you should use amqp://hono.eclipseprojects.io:5672 as the server address. The second parameter to the constructor (telemetry) is irrelevant and can be removed.

    Also make sure that you have a consumer running for your tenant. Otherwise, the sender will not get any credits for actually sending messages ...

    Edited Oct. 21st, 2021

    This code works for me ...

    class AmqpMessageSender(MessagingHandler):
        def __init__(self, server):
            super(AmqpMessageSender, self).__init__()
            self.server = server
    
        def on_start(self, event):
            print("In start")
            conn = event.container.connect(
                url=self.server,
                sasl_enabled=True,
                allowed_mechs="PLAIN",
                allow_insecure_mechs=True,
                user=f'{deviceId}@{tenantId}',
                password=devicePassword
            )
            print("connection established")
            event.container.create_sender(context=conn, target=None)
            print("sender created")
    
        def on_sendable(self, event):
            print("In Msg send")
            event.sender.send(Message(
                address=f'telemetry',
                content_type='application/json',
                body="{\"temp\": 5, \"transport\": \"amqp\"}"
            )) 
            event.sender.close()
            event.connection.close()
            print("Sender & connection closed")
    
        def on_connection_error(self, event):
            print("Connection Error")
    
        def on_link_error(self, event):
            print("Link Error")
    
        def on_transport_error(self, event):
            print("Transport Error")
            print(event)
    
    Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()