I'm trying to publish a MQTT message and receive the message with an AMQP consumer by using the RabbitMQ-MQTT plugin on Ubuntu14.04. I'm publishing the MQTT message with the Mosquitto-clients package. I enabled the MQTT plugin for RabbitMQ.
Now if I want to send a MQTT message, my AMQP consumer code throws an exception:
Traceback (most recent call last):
File "consume_topic.py", line 33, in <module>
channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 722, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 88, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 184, in _handle_read
super(BlockingConnection, self)._handle_read()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 308, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1134, in _on_data_available
consumed_count, frame_value = self._read_frame()
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1201, in _read_frame
return frame.decode_frame(self._frame_buffer)
File "/usr/local/lib/python2.7/dist-packages/pika/frame.py", line 254, in decode_frame
out = properties.decode(frame_data[12:])
File "/usr/local/lib/python2.7/dist-packages/pika/spec.py", line 2479, in decode
(self.headers, offset) = data.decode_table(encoded, offset)
File "/usr/local/lib/python2.7/dist-packages/pika/data.py", line 106, in decode_table
value, offset = decode_value(encoded, offset)
File "/usr/local/lib/python2.7/dist-packages/pika/data.py", line 174, in decode_value
raise exceptions.InvalidFieldTypeException(kind)
pika.exceptions.InvalidFieldTypeException: b
My Pika (python) consumer code is the following:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',type='topic',durable=False)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
My RabbitMQ configuration file is the following:
[{rabbit, [{tcp_listeners, [5672]}]},
{rabbitmq_mqtt, [{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{allow_anonymous, true},
{vhost, <<"/">>},
{exchange, <<"logs">>},
{subscription_ttl, 1800000},
{prefetch, 10},
{ssl_listeners, []},
%% Default MQTT with TLS port is 8883
%% {ssl_listeners, [8883]}
{tcp_listeners, [1883]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true}]}]}
].
The log file shows the following:
=INFO REPORT==== 14-Apr-2015::10:57:50 ===
accepting AMQP connection <0.1174.0> (127.0.0.1:42447 -> 127.0.0.1:5672)
=INFO REPORT==== 14-Apr-2015::10:58:30 ===
accepting MQTT connection <0.1232.0> (127.0.0.1:53581 -> 127.0.0.1:1883)
=WARNING REPORT==== 14-Apr-2015::10:58:30 ===
closing AMQP connection <0.1174.0> (127.0.0.1:42447 -> 127.0.0.1:5672):
connection_closed_abruptly
=INFO REPORT==== 14-Apr-2015::10:58:30 ===
closing MQTT connection <0.1232.0> (127.0.0.1:53581 -> 127.0.0.1:1883)
Can anybody please help me? I googled the "pika.exceptions.IvalidFieldTypeException" and found that I'm not using a correct "Field Type", how is that?
This is most likely a bug in the specifications (decoder) for pika. I would recommend that you change library to something more frequently updated. As an example you could look at the author of pika's new library RabbitPy or my very own pika inspired library AMQP-Storm.
Although, it could also be that you are running a very old version of Pika. I found this commit from gmr that should have fixed your issue. You could try to upgrade to pika 0.9.14.