Search code examples
pythonsocketsstreampython-asynciofastapi

AsyncIO Streams works with FastAPI but doesn't works separately


So I have this class for async handling Socket connection.

# socket_service.py
class Sockets:
    reader: asyncio.StreamReader
    writer: asyncio.StreamWriter
    message_queue = []

    async def start(self):
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(host, port),
            timeout=5
        )
        self.reader = reader
        self.writer = writer
        loop = asyncio.get_running_loop()
        loop.create_task(self.read())
        loop.create_task(self.write())

    async def read(self):
        while True:
            response = await asyncio.wait_for(
                self.reader.read(),
                timeout=60,
            )
            if response:
                message_queue.append(response)
            await asyncio.sleep(1)

    async def write(self):
        while True:
            if message_queue:
                self.writer.write(message.queue.pop(0))
                await self.writer.drain()

And I run it like this with FastAPI and Uvicorn:

# application.py
from fastapi import FastAPI

def register_startup_event(app: FastAPI):
    @app.on_event("startup")
    async def _startup() -> None:
        app.state.session = Sockets()
        await app.state.session.start()
    return _startup

def get_app():
    app = FastAPI()
    register_startup_event(app)
    return app
# __main__.py
import uvicorn

def main():
    uvicorn.run(
        "application_folder.application:get_app",
        workers=1,
        factory=True,
    )


if __name__ == "__main__":
    main()

And it works perfectly in FastAPI! But when I tried to run it manually I get errors

# manual_start.py
async def init():
    session = Sockets()
    await session.start()
  1. In this case, the application terminates immediately, as if the infinite loop had gone through once and exited immediately. Process finished with exit code 0
if __name__ == "__main__":
    asynio.run(init())
  1. In both of these cases, the application does not terminate immediately, but it does not receive messages on the socket. The print outputs zero bytes b'', as if the server is not sending it anything, although I note that when I immediately launch fastapi stack everything works and all data arrives
# CASE 2
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(init())
    loop.run_forever()

# CASE 3
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init())
    loop.run_forever()

I'm sure that FastAPI or Uvicorn is doing some kind of asynchronous magic under the hood that I can't figure out and can't get my class to work separately. What could be the problem? You can simply agree to use it only inside FastAPI, but I need it separately

P.S. I asked ChatGPT, he suggests that I either remove wait_for or add await's or swap function calls, in general, so that he doesn’t advise me, nothing works for me, but with FastAPI everything works


Solution

  • The issue here is that init waits for the connection to open, but then the read and write funktions are run as async tasks in the background, so the function exits and your program quits. You need something to provide the infinite loop to keep the program running. In the FastAPI case, uvicorn is handling this for you. If you want to run it as a standalone program, try something like this:

    import asyncio
    
    async def main():
        session = Sockets()
        await session.start()
    
        # wait forever, but end cleanly on ctrl+C
        while True:
            try:
                await asyncio.sleep(0.2)
            except KeyboardInterrupt:
                break