Search code examples
pythonqueuepython-asyncio

Python update queue with several asyncio


I want to update Queue with several asyncio I receive data from each A,B,C( using websocket and "while true") and then i want to put in the queue and all the provider will be able to write in the same Queue ( I know that maybe i need to use multiThread or something else but i dont find the right way

**if __name__ == '__main__':
global_queue = queue.Queue()
asyncio.run(A_Orderbook.data_stream(global_queue))
asyncio.run(B_Orderbook.data_stream(global_queue))
asyncio.run(C_Orderbook.data_stream(global_queue))
print(global_queue.qsize())**

Thks


Solution

  • You can do it the following way:

    import asyncio
    
    
    async def worker(worker_name: str, q: asyncio.Queue):
        """Produces tasks for consumer."""
        for i in range(1, 6):
            await asyncio.sleep(1)
            await q.put(f"{worker_name}-{i}")
    
    
    async def consumer(q: asyncio.Queue):
        """Consumes tasks from workers."""
        while True:
            item = await q.get()
            await asyncio.sleep(1)
            print(item)
            # we need it to ensure that all tasks were done
            q.task_done()   
    
    
    async def main_wrapper():
        """Main function - entry point of our async app."""
        q = asyncio.Queue()
        # we do not need to await the asyncio task it is run in "parallel"
        asyncio.create_task(consumer(q))  
        await asyncio.gather(*[worker(f"w{i}", q) for i in range(1, 5)])  # create worker-tasks
        await q.join()  # we wait until asyncio.create_task(consumer(q)) consume all tasks
        print("All DONE !")
    
    if __name__ == '__main__':
        asyncio.run(main_wrapper())