Search code examples
pythonmultithreadingpython-asynciotelegram-botpython-telegram-bot

thread.join() blocks an async function


I am developing a telegram bot using python-telegram-bot. It is like a stock screener, it analyzes the market every given interval and sends the results to users that are subscribed to the indicator. The problem is I do not want the bot to be blocked (users continuously interact with it) when it is analyzing the market (it fetches data & has a lot of computations). So I thought I needed to do that in a different thread, but I can't make it work with the asyncio

Here is the example:

async def run_screener(bot):
    while True:
    
        async def heavy_computations():
            for i in range(5):
                await asyncio.sleep(2)
                print("Doing computations")

        compute = threading.Thread(target=lambda: asyncio.run(heavy_computations()))
        compute.start()
        compute.join()  #   <--- This is blocking the bot
        
        # Computations are done, now send the results with the bot
        for user_id in users:
            await bot.send_message(text=results, chat_id=user_id)

        await asyncio.sleep(compute_next_time())
        

async def main():
    application = buildBot()
    
    async with application:
        await application.start()
        await application.updater.start_polling()
        
        await run_screener(application.bot)
        
        await application.updater.stop()
        await application.stop()

asyncio.run(main())

Solution

  • To run a non-asyncio task and await its completion without blocking other asyncio tasks tou use method asyncio.loop.run_in_executor. You can either execute your non-asyncio worker function in a multiprocessing pool or a multithreading pool. Since you are using multithreading we can either run the task in a default multithreading pool provided by the asyncio loop or create a custom pool, as we are doing here (in this case we only need a pool size of 1):

    import asyncio
    import concurrent.futures
    import time
    
    async def task1():
        # Emulate doing some processing:
        await asyncio.sleep(2)
        return 1
    
    async def task2():
        # Use a ProcessPoolExecutor if the processing is CPU-intensive:
        with concurrent.futures.ThreadPoolExecutor(1) as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, io_task)
        return result
    
    def io_task():
        # Emulate doing some processing:
        time.sleep(2)
        return 2
    
    async def main():
        results = await asyncio.gather(task1(), task2())
        print(results)
    
    asyncio.run(main())
    

    Prints:

    [1, 2]