Search code examples
pythonpython-asyncionats.io

python nats connection error when running from a class


I am trying to run an application that waits for a specific nats message before continuing. So I made the following class that sends messages and also listens to messages:

#!/usr/bin/python3
from nats.aio.client import Client as NATS
import asyncio


class NatsService:

    def __init__(self):
        self.NATSERVER = "XXXXX"
        self.CHANNEL = "msgchannel"
        self.CONNECTTIMEOUT = 1
        self.MAXRECONNECTATTEMPTS = 1
        self.RECONNECTTIMEWAIT = 1
        self.nc = NATS()

    async def send_message(self, message, channel=None):
        if not channel:
            channel = self.CHANNEL

        print("connecting to nats server")
        await self.nc.connect(self.NATSERVER, self.CONNECTTIMEOUT,
                              max_reconnect_attempts=self.MAXRECONNECTATTEMPTS,
                              reconnect_time_wait=self.RECONNECTTIMEWAIT)

        print(f"Publishing message: '{message}' to channel: {channel}")
        await self.nc.publish(channel, message.encode('utf-8'))
        print("message sent, closing connection")
        await self.nc.close()
        print("nats server connection closed")

    def start_listening(self):
        loop = asyncio.get_event_loop()
        try:
            loop.create_task(self.listener_loop(loop))
            loop.run_forever()
        finally:
            loop.close()

    async def listener_loop(self, loop):
        print("connecting to nats listener loop")
        await self.nc.connect(self.NATSERVER, loop=loop)

        async def message_handler(msg):
            subject = msg.subject
            data = msg.data.decode()
            print('Received a message on {}: {}'.format(subject, data))

            if eval(data.split(":::")[1]):
                print("message received, closing")
                await nc.drain()    # timeout occurs for some reason
                print("stopping loop")
                loop.stop()
            
        await self.nc.subscribe(self.CHANNEL, cb=msg_handler)

I am importing this class in two applications, one that's supposed to send the messages and one that is supposed to listen for those messages until the correct one is received.

My main application listens for messages and only continues when it has received the correct message

from nats_service import NatsService

try:
    print("starting nats service instance")
    ns = NatsService()
    print("listening for approved message")
    start_listening()
except Exception as e:
    print(f"Error: {e}")

print(f"Contiuing with application...")

the other application is meant to send messages:

from nats_service import NatsService
import asyncio

async def main():
    ns = NatsService()
    message = "test"
    await ns.send_message(message)

if __name__=="__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print(f"Completed sender function.")

I am able to send and receive messages on standalone functions that I created before I put it all together. But I can't seem to run them when importing them from the class above, running especially into problems with asyncio.

After some trial and error, when I run the sender, it finally seems to start but immediately fails with an error I don't understand and can't find much info about:

connecting to nats server
nats: encountered error
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1185, in _select_next_server
    connection_future, self.options['connect_timeout']
  File "/usr/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
  File "/usr/lib/python3.7/asyncio/streams.py", line 75, in open_connection
    protocol = StreamReaderProtocol(reader, loop=loop)
  File "/usr/lib/python3.7/asyncio/streams.py", line 227, in __init__
    self._closed = self._loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Traceback (most recent call last):
  File "sender_side.py", line 13, in <module>
    loop.run_until_complete(main())
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "sender_side.py", line 9, in main
    await ns.send_message(message)
  File "/home/bot10-sigma/nats_tests/nats_service.py", line 23, in send_message
    reconnect_time_wait=self.RECONNECTTIMEWAIT)
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 317, in connect
    await self._select_next_server()
  File "/usr/local/lib/python3.7/dist-packages/nats/aio/client.py", line 1174, in _select_next_server
    self.options["reconnect_time_wait"], loop=self._loop
  File "/usr/lib/python3.7/asyncio/tasks.py", line 563, in sleep
    future = loop.create_future()
AttributeError: 'int' object has no attribute 'create_future'
Exception ignored in: <function StreamReaderProtocol.__del__ at 0x761cacd8>
Traceback (most recent call last):
  File "/usr/lib/python3.7/asyncio/streams.py", line 271, in __del__
AttributeError: 'StreamReaderProtocol' object has no attribute '_closed'

Solution

  • You are passing self.CONNECTTIMEOUTas a positional parameter to Client.connect where it expects a reference for its io_loop parameter. That is why you get an AttributeError: an int doesn't have a create_future attribute. Pass the timeout as connect_timeout=self.CONNECTTIMEOUT and this problem should go away.