I have 2 py. files:
1. I have 4 workers. Each worker works 10 seconds. The total execution time of the program is 10 seconds. So all workers works simultaneously.
import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()
def work():
sleep(10)
async def producer():
tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
for f in asyncio.as_completed(tasks, loop=loop):
results = await f
await queue.put(results)
async def consumer():
completed = 0
while completed < num_jobs:
job = await queue.get()
completed += 1
if __name__ == '__main__':
s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)
2. Python Telegram Bot, that uses Webhooks for receiving messages from users. This bot receives messages from user and after 10 seconds send echo-message to user. But this not gives me that result that I give in the first case. In this second case, messages are not processed simultaneously.
import asyncio
import logging
from aiogram.dispatcher import Dispatcher
from aiogram.utils.executor import start_webhook
from aiogram import Bot, types
from bot_files.config import *
bot = Bot(token=bot_token)
dp = Dispatcher(bot)
# webhook settings
WEBHOOK_HOST = f'https://2568-176-8-60-184.ngrok.io'
WEBHOOK_PATH = f'/webhook/{bot_token}'
WEBHOOK_URL = f'{WEBHOOK_HOST}{WEBHOOK_PATH}'
# webserver settings
WEBAPP_HOST = '0.0.0.0'
WEBAPP_PORT = os.getenv('PORT', default=5000)
async def on_startup(dispatcher):
await bot.set_webhook(WEBHOOK_URL, drop_pending_updates=True)
async def on_shutdown(dispatcher):
await bot.delete_webhook()
@dp.message_handler()
async def echo(message: types.Message):
await asyncio.sleep(10)
await message.answer(message.text)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
start_webhook(
dispatcher=dp,
webhook_path=WEBHOOK_PATH,
skip_updates=True,
on_startup=on_startup,
on_shutdown=on_shutdown,
host=WEBAPP_HOST,
port=WEBAPP_PORT,
)
I want to put @dp.message_handler()
in loop.run_until_complete(asyncio.gather(producer()))
such a producer()
. But I can't put decorator as a parameter. My total question is: How to process messages from many users simultaneously for telegram bot on Python, such in the first case? I know how to make it for long polling. Is it possible make it for Webhooks???
If your bot works on Webhooks, Aiogram has special decorator for simultaneous messages processing from many users - async_task
: https://docs.aiogram.dev/en/latest/_modules/aiogram/dispatcher/dispatcher.html#Dispatcher.async_task.