Search code examples
pythonredisarchitecturefastapimessage-queue

How to listen for events in side thread?


I am making app for getting information using binance websocket api. We already have main codebase but decided to implement websockets in independent app. It looks like this:

import asyncio
from threading import Thread

import uvicorn
from fastapi import FastAPI

from binance import AsyncClient, BinanceSocketManager

app = FastAPI()

@app.get("/")
async def root():
    return "Websocket app"


async def main_socket_stream():
    client = await AsyncClient.create()
    binance_manager = BinanceSocketManager(client=client)
    multiplex_socket = binance_manager.futures_multiplex_socket(symbols)

    async with multiplex_socket as active_socket:
        while True:
            result = await active_socket.recv()
            print(result)
            # There are lots of further logic - proceed data, save something to db, etc.


def side_thread():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    asyncio.run(main_socket_stream())


if __name__ == "__main__":
    thread = Thread(target=side_thread, args=(), daemon=True)
    thread.start()
    uvicorn.run(app, port=5105)

I removed some unnecessary parts, but the main idea is clear. So, i have websocket in background - it works. But i want an ability to control them from main app (which based on Django).

I thought about something like:

async with multiplex_socket as active_socket:
    while True:
        if await EventListenerClass.new_event_fired():
            break  # or do something special
        result = await active_socket.recv()
        print(result)

I think about making some "EventListener" class using Redis Pub-Sub and listening for new messages in channel. There will be several type of messages, for example: "stop_websocket", "reload_with_updated_symbols_list", etc.

I need a solution:

  • fast enough to work every few seconds
  • which will give me an opportunity to control websockets

Am i thinking right with Redis?


Solution

  • The general workflow should include Redis or a similar Sub-Pub system, that coordinates messaging. If it was all the same application, you would use mutex locking to read data from shared space, which is essentially performing the same function as a Sub-Pub like Redis.

    I'm familiar with this same Redis solution in Swagger, a Python REST API pattern/framework. Redis becomes one method of checking state. Note that there are performance concerns that one might need to address if you need low latency high bandwidth requests, however, though generally Redis in a small system or in a VM would be able to respond within milliseconds of an update.

    You could also design your own real-time sub-pub system that used network blocking to smooth out concurrency issues, however, redis is a widely adopted and available solution that is present on a variety of platforms.

    Qt uses Websockets to handle its Cef client integration using bindings from C++ into Javascript. This is another way of sending messaging from a host application to a CEF window.