I'm rewriting the parser to make it convenient to use in a telegram bot. I need to run several threads from the function that will do their job.
The problem is that while the threads from multiprocessing are running, dispatcher cannot respond to new chat messages ( dp just afk )
from aiogram import Dispatcher, Bot, executor
from multiprocessing import Process
from time import sleep
bot_token = "token"
bot = Bot(token=bot_token)
dp = Dispatcher(bot)
allowed_users = [My_user_id]
class work:
@staticmethod
def reparse(user_id, data):
sleep(10) # imitation of work
return
@staticmethod
async def parsing(user_id, message):
await message.answer('Parsing started')
# There bot do some job
# ...
some_data = [124215, 123543, 346457, 347347]
processes = []
for i in range(4):
args = [user_id, some_data[i]]
_process = Process(target=work.reparse, args=(*args,))
processes.append(_process)
_process.start()
for process in processes:
process.join()
await message.answer('Work completed')
return
if __name__ == '__main__':
@dp.message_handler(commands=['check_dispatcher'])
async def check_dispatcher(message):
await message.answer('ANSWER')
@dp.message_handler(commands=['work_start'])
async def parser(message):
await work.parsing(message.chat.id, message)
print('STARTED')
executor.start_polling(dp, skip_updates=True)
What I've already tried:
for process in processes:
process.join()
But this does not work and the dispatcher is idle while the processes are running. I also tried the option without process.join(). I checked if the thread is alive using process.is_alive() and if all threads have completed their work, then continue, but this also did not work
@staticmethod
async def parsing(user_id, message):
await message.answer('Parsing started')
# There bot do some job
# ...
some_data = [124215, 123543, 346457, 347347]
processes = []
for i in range(4):
args = [user_id, some_data[i]]
_process = asyncio.run(create_process(*args))
processes.append(_process)
#
# waiting for processes end
#
await message.answer('Work completed')
return
@staticmethod
async def create_process(args):
_process = Process(target=work.reparse, args=(*args,))
_process.start()
return _process
_process = threading.Thread(target=asyncio.run, args=(work.reparse(*args),))
The dispatcher responds only after the processes are completed
After adding the code block below, the problem was fixed. Now dispatcher can respond to new messages while threads is alive
for thread in processes:
while thread.is_alive():
await asyncio.sleep(0.1)
thread.join()
The final working version of my program below
from aiogram import Dispatcher, Bot, executor
from multiprocessing import Process
import threading
from time import sleep
import asyncio
bot_token = "TOKEN"
allowed_users = [USER_ID]
class work:
@staticmethod
def reparse(user_id, data):
sleep(10) # imitation of work
return
@staticmethod
async def parsing(user_id, message):
await message.answer('Parsing started')
# There bot do some job
# ...
some_data = [124215, 123543, 346457, 347347]
processes = []
for i in range(4):
args = [user_id, some_data[i]]
_process = threading.Thread(target=work.reparse, args=(*args,))
processes.append(_process)
_process.start()
for thread in processes:
while thread.is_alive():
await asyncio.sleep(0.1)
thread.join()
await message.answer('Work completed')
return
@staticmethod
async def create_process(work_type,user_id,args):
_process = Process(target=work.reparse, args=(*args,))
_process.start()
return _process
if __name__ == '__main__':
bot = Bot(token=bot_token)
dp = Dispatcher(bot)
@dp.message_handler(commands=['check_dispatcher'])
async def check_dispatcher(message):
await message.answer('ANSWER')
@dp.message_handler(commands=['work_start'])
async def parser(message):
await work.parsing(message.chat.id, message)
print('STARTED')
asyncio.run(executor.start_polling(dp, skip_updates=True))