Search code examples
pythonmultithreadingmultiprocessingaiogram

How to combine python aiogram with multiprocessing | Python


I'm rewriting the parser to make it convenient to use in a telegram bot. I need to run several threads from the function that will do their job.

The problem is that while the threads from multiprocessing are running, dispatcher cannot respond to new chat messages ( dp just afk )

from aiogram import Dispatcher, Bot, executor
from multiprocessing import Process
from time import sleep

bot_token = "token"
bot = Bot(token=bot_token)
dp = Dispatcher(bot)
allowed_users = [My_user_id]


class work:
    @staticmethod
    def reparse(user_id, data):
        sleep(10)  # imitation of work
        return

    @staticmethod
    async def parsing(user_id, message):
        await message.answer('Parsing started')
        # There bot do some job
        # ...
        some_data = [124215, 123543, 346457, 347347]
        processes = []
        for i in range(4):
            args = [user_id, some_data[i]]
            _process = Process(target=work.reparse, args=(*args,))
            processes.append(_process)
            _process.start()
        for process in processes:
            process.join()
        await message.answer('Work completed')
        return


if __name__ == '__main__':
    @dp.message_handler(commands=['check_dispatcher'])
    async def check_dispatcher(message):
        await message.answer('ANSWER')


    @dp.message_handler(commands=['work_start'])
    async def parser(message):
        await work.parsing(message.chat.id, message)


    print('STARTED')
    executor.start_polling(dp, skip_updates=True)

What I've already tried:

  1. Initially I used
for process in processes:
            process.join()

But this does not work and the dispatcher is idle while the processes are running. I also tried the option without process.join(). I checked if the thread is alive using process.is_alive() and if all threads have completed their work, then continue, but this also did not work

  1. I have also tried this kind of execution
@staticmethod
    async def parsing(user_id, message):
        await message.answer('Parsing started')
        # There bot do some job
        # ...
        some_data = [124215, 123543, 346457, 347347]
        processes = []
        for i in range(4):
            args = [user_id, some_data[i]]
            _process = asyncio.run(create_process(*args))
            processes.append(_process)
        #
        # waiting for processes end
        #
        await message.answer('Work completed')
        return
@staticmethod
    async def create_process(args):
        _process = Process(target=work.reparse, args=(*args,))
        _process.start()
        return _process
  1. Also tried but it dispatcher still afk when any process in working
_process = threading.Thread(target=asyncio.run, args=(work.reparse(*args),))

The dispatcher responds only after the processes are completed


Solution

  • After adding the code block below, the problem was fixed. Now dispatcher can respond to new messages while threads is alive

    for thread in processes:
        while thread.is_alive():
            await asyncio.sleep(0.1)
        thread.join()
    

    The final working version of my program below

    from aiogram import Dispatcher, Bot, executor
    from multiprocessing import Process
    import threading
    from time import sleep
    import asyncio
    
    bot_token = "TOKEN"
    allowed_users = [USER_ID]
    
    
    class work:
        @staticmethod
        def reparse(user_id, data):
            sleep(10)  # imitation of work
            return
    
        @staticmethod
        async def parsing(user_id, message):
            await message.answer('Parsing started')
            # There bot do some job
            # ...
            some_data = [124215, 123543, 346457, 347347]
            processes = []
            for i in range(4):
                args = [user_id, some_data[i]]
                _process = threading.Thread(target=work.reparse, args=(*args,))
                processes.append(_process)
                _process.start()
            for thread in processes:
                while thread.is_alive():
                    await asyncio.sleep(0.1)
                thread.join()
            await message.answer('Work completed')
            return
    
        @staticmethod
        async def create_process(work_type,user_id,args):
            _process = Process(target=work.reparse, args=(*args,))
            _process.start()
            return _process
    if __name__ == '__main__':
        bot = Bot(token=bot_token)
        dp = Dispatcher(bot)
        @dp.message_handler(commands=['check_dispatcher'])
        async def check_dispatcher(message):
            await message.answer('ANSWER')
    
    
        @dp.message_handler(commands=['work_start'])
        async def parser(message):
            await work.parsing(message.chat.id, message)
    
    
        print('STARTED')
        asyncio.run(executor.start_polling(dp, skip_updates=True))