Search code examples
pythonwebsocketdownloadaiohttpsimultaneous

Process websocket stream while downloading multiple files using aiohttp


I'm following instructions (here) to mirror multiple orderbooks on Binance Exchange on my local machine.

Suppose for simplicity, I wish to mirror orderbooks for 2 symbols: ETHBTC and DOGEBTC (in reality it's 350+).

First I have to buffer the websocket order-update streams:

  • wss://stream.binance.com:9443/stream?streams=ETHBTC@depth@100ms
  • wss://stream.binance.com:9443/stream?streams=DOGEBTC@depth@100ms

Now I have to download snapshots:

As soon as I have the snapshots, I apply the buffer (which is ongoing) to them, yielding a STATE.

After that, all order-updates can simply be applied to the state.

For the updates stream I can do:

        async with aiohttp.ClientSession() as session:
            async with session.ws_connect(URL) as wsock:
                async for msg in wsock:
                    if msg.type != aiohttp.WSMsgType.TEXT:
                        J = json.loads(msg.data)
                        symbol = J['data']['s']

                        process_update(symbol, J)

But how can I, once the first update has come in, initiate downloading the snapshot, with a completion handler that will process it, in such a way as to not interrupt the stream?

If I'm tracking 300 symbols, that's 300 downloads happening at the same time.

I found resources on async downloading of multiple files, but I cannot see how to integrate this with the requirement of processing the stream.

I could always do the downloads in a separate thread, but isn't this fighting the architecture goals of aiohttp?

REF:


Solution

  • Answer thanks to graingert on IRC Freenode #python 🙏

    import anyio.to_thread
    
    
    async def foo():
        async def download(symbol):
            async with session.get(f"{url}/{symbol}") as resp:
                await do_something(resp)
    
        async with aiohttp.ClientSession() as session, session.ws_connect(
            URL
        ) as wsock, anyio.create_task_group() as tg:
            async for msg in wsock:
                if msg.type != aiohttp.WSMsgType.TEXT:
                    J = json.loads(msg.data)
                    symbol = J["data"]["s"]
    
                    tg.start_soon(download, symbol)
                    await anyio.to_thread.run(process_update, symbol, J)