Search code examples
pythoneclipse-honoeclipse-iot

Reply to Hono command on AMQP device


I'm trying to create a prototype device that is able to receive commands from hono and reply to it.

I've installed hono 1.10.0 and run the following python code

import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri


correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'

print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)


print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)


print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)


print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
    address=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    properties={
        'status': 200,
        'device_id': deviceId,
        'tenant_id': tenantId
    },
    subject="call",
    body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)


print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")

I did this implementation with the help of Difficulty in Sending AMQP 1.0 Message and according to my understanding of https://www.eclipse.org/hono/docs/api/command-and-control/ and https://www.eclipse.org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command.

For the moment i doesn't seem so wrong as the device receives the command, and also sending the message does not show any errors. However on the receiving end nothing arrives. Just to clarify, the AmqpReceiver implementation works for the scenario where I listen for telemetry data. Hence, if the implementation is supposed to be the same (aside a different address), then this should not be the issue.

I highly believe that I do something wrong with the address/reply_to in the message but I can't confirm as the logs in the hono pods don't tell me anything :(

br Armin

======update===============================

the code i currently run is the follwoing

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce


class Amqp(MessagingHandler):
    def __init__(self, server, address, user, password, options=None):
        super(Amqp, self).__init__()
        self.server = server
        self.address = address
        self.user = user
        self.password = password
        self.options = options
        self.connection = None

    def create_connection(self, event):
        self.connection = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=self.user,
            password=self.password
        )
        print("Connection established")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

    def on_link_opened(self, event):
        if event.link.is_sender:
            print("Opened sender link")
        if event.link.is_receiver:
            print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))


class AmqpReceiver(Amqp):
    def __init__(self, server, address, user, password, options=None):
        super(AmqpReceiver, self).__init__(server, address, user, password, options)
        self.server = server
        self.user = user
        self.password = password

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
        print("Receiver created")

    def on_message(self, event):
        print(f'Receiver [{self.address}] got message:')
        print(f'  {event.message.reply_to}')
        print(f'  {event.message.correlation_id}')
        print(f'  {event.message.properties}')
        print(f'  {event.message.subject}')
        print(f'  {event.message.body}')
        #just for test purposes - the device sends imediatelly the reply if a reply_to is given
        if event.message.reply_to is not None:
            reply_to = event.message.reply_to.split('/')
            tenant_id = reply_to[1]
            device_id = reply_to[2]
            resp = Message(
                address=event.message.reply_to,
                correlation_id=event.message.correlation_id,
                content_type="text/plain",
                properties={
                    'status': 200,
                    'tenant_id': tenant_id,
                    'device_id': device_id
                },
                body=f'Reply on {event.message.body}'
            )
            sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
            sender.send(resp)
            sender.close()
            print("Reply send")


class AmqpSender(Amqp):
    def __init__(self, server, messages, user, password, address=None, options=None):
        super(AmqpSender, self).__init__(server, address, user, password, options)
        self.messages = messages

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_sender(context=self.connection, target=self.address)
        print("Sender created")

    def on_sendable(self, event):
        print("In Msg send")
        for msg in self.messages:
            event.sender.send(msg)
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

and in the test script i use that as follows

from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver


biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'

biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'

correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'


print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)

print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)

print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()

time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")

if i run that code sample i get the following logs (see bellow) and the code is stuck as the command reply receiver remains open.

log on hono dispatch router:

2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no

log on amqp adapter

2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null

Solution

  • It looks like the command response sent by your device contains the wrong address. As pointed out in the AMQP Adapter User Guide, the response's address property must be set to the value of the reply-to property of the command. That value is usually NOT the same as the reply-to value that your application sets in the command message because the protocol adapter needs to encode some additional information into the reply to address in order to be able to determine the correct device ID when forwarding the command response downstream.

    In your code you therefore need to inspect the command message at the device side and use its reply-to value as the command response's address value.

    In addition to that, the AMQP adapter expects the status property in the command response to be of AMQP 1.0 type int (a 32 bit signed integer). However, with your code the property value is encoded as an AMQP 1.0 long (64 bit signed integer) by default. In order to encode it correctly, you need to import the int32 class from proton._data and then set the property value as int32(200). Then the adapter accepts the command response and forwards it downstream.