I followed the Asynchronous publisher example from pika and tried to run its self._connection.ioloop.start()
in a separate thread. So far I managed to use a queue for the main thread to add messages to publish. But the only way I could get the publisher thread to get the messages from the queue is not satisfactory. I used something like
try:
message = self._queue.get(True, 1)
self._channel.basic_publish(body=message, exchange=self._exchange, routing_key='example.text')
except queue.Empty:
pass
finally:
self._connection.add_timeout(0.0001, self.publish_message)
There has to be a better way to do this, right? It's important to note that I'm using this with Python 3.6.4 in Windows and the IO loop chosen by pika.SelectConnection
seems to be very limited...
Edit: I just found out how to use adapters.AsyncioConnection
instead of SelectConnection
. So now I can replace self._connection.add_timeout(0.0001, self.publish_message)
with self._connection.loop.call_soon(self.publish_message)
.
This gives very weird results: messages seems to be buffered and sent every seconds. I'm new to Python so I would appreciate some insight at lot!
The correct answer to this question, in case it still comes up in search results, is to upgrade to at least Pika v0.12 and take advantage of the method add_callback_threadsafe available for various connection adapters. Here is an example.