I am trying to run an application that waits for a specific nats message before continuing. So I made the following class that sends messages and also listens to messages:
#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio
class NatsService:
def __init__(self):
self.NATSERVER = "XXXXX"
self.CHANNEL = "msgchannel"
self.CONNECTTIMEOUT = 1
self.MAXRECONNECTATTEMPTS = 1
self.RECONNECTTIMEWAIT = 1
self.nc = NATS()
async def send_message(self, message, channel=None):
if not channel:
channel = self.CHANNEL
print("connecting to nats server")
await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
reconnect_time_wait=self.RECONNECTTIMEWAIT)
print(f"Publishing message: '{message}' to channel: {channel}")
await self.nc.publish(channel, message.encode('utf-8'))
print("message sent, closing connection")
await self.nc.close()
print("nats server connection closed")
def start_listening(self):
loop = asyncio.get_event_loop()
try:
loop.create_task(self.listener_loop(loop))
loop.run_forever()
finally:
loop.close()
async def listener_loop(self, loop):
print("connecting to nats listener loop")
await self.nc.connect(self.NATSERVER, loop=loop)
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print('Received a message on {}: {}'.format(subject, data))
if eval(data.split(":::")[1]):
print("message received, closing")
await nc.drain() # timeout occurs for some reason
print("stopping loop")
loop.stop()
await self.nc.subscribe(self.CHANNEL, cb=msg_handler)
I am importing this class in two applications, one that's supposed to send the messages and one that is supposed to listen for those messages until the correct one is received.
My main application listens for messages and only continues when it has received the correct message
from nats_service import NatsService
try:
print("starting nats service instance")
ns = NatsService()
print("listening for approved message")
start_listening()
except Exception as e:
print(f"Error: {e}")
print(f"Contiuing with application...")
the other application is meant to send messages:
from nats_service import NatsService
import asyncio
async def main():
ns = NatsService()
message = "test"
await ns.send_message(message)
if __name__=="__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print(f"Completed sender function.")
I am able to send and receive messages on standalone functions that I created before I put it all together. But I can't seem to run them when importing them from the class above, running especially into problems with asyncio.
After some trial and error, when I run the sender, it finally seems to start but immediately fails with an error I don't understand and can't find much info about:
connecting to nats server
nats: encountered error
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
connection_future, self.options['connect_timeout']
File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
return fut.result()
File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
protocol = StreamReaderProtocol(reader, loop=loop)
File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
File "sender_side.py", line 13, in <module>
loop.run_until_complete(main())
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "sender_side.py", line 9, in main
await ns.send_message(message)
File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
reconnect_time_wait=self.RECONNECTTIMEWAIT)
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
await self._select_next_server()
File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
self.options["reconnect_time_wait"], loop=self._loop
File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'
You are passing self.CONNECTTIMEOUT
as a positional parameter to Client.connect
where it expects a reference for its io_loop
parameter. That is why you get an AttributeError
: an int
doesn't have a create_future
attribute. Pass the timeout as connect_timeout=self.CONNECTTIMEOUT
and this problem should go away.