Search code examples
pythonasynchronousconcurrencypython-asynciopython-telegram-bot

How to concurrent my program with Python Telegram Bot?


I wrote a channel management bot with PTB. The logic of this robot is that it takes the desired posts from the internet, processes them, and then sends that post to each of its channels.

What I want to do, but I don't know how, is to have a function that creates a separate thread for each channel and refreshes it every 60 seconds so that if a new channel is added, it creates a separate thread for that too.

I already did this with threading, but I got an event loop error.

    builder = ApplicationBuilder()
    builder.connection_pool_size(50000)
    builder.get_updates_connection_pool_size(50000)
    builder.pool_timeout(100)
    builder.get_updates_pool_timeout(100)
    app = builder.token(TOKEN).get_updates_http_version('1.1').http_version('1.1').build()

    thread_list = []

    async def schedule():
        channel_list = mydb.select("channels", {"visibility": 1, "active": "activate"})
        for one in channel_list:
            if one['username'] not in thread_list:
                app.create_task(schedule_work(one['username']))
                thread_list.append(one['username'])


    scheduler = AsyncIOScheduler()
    scheduler.add_job(schedule,'interval', seconds=60, name="main")
    scheduler.start()
    app.run_polling()

In the part of my code that I put here:

  • The schedule function is executed every 60 seconds
  • The work of the schedule_work function is to receive posts from the database and send them to the channels. To send the posts in the channels faster, I want this function to be executed separately for each channel.

When I run the above code, it starts posting in the channel, but if I work with the bot at the same time, I get the following error:

.
.
  File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
    await fut
asyncio.exceptions.CancelledError
.
.
  File "/root/abaradmin/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 119, in __exit__
    raise TimeoutError
TimeoutError
.
.
.
  File "/root/abaradmin/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.PoolTimeout
.
.
  File "/root/abaradmin/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.PoolTimeout
.
.
  File "/root/abaradmin/venv/lib/python3.10/site-packages/telegram/request/_httpxrequest.py", line 226, in do_request
    raise TimedOut(
telegram.error.TimedOut: Pool timeout: All connections in the connection pool are occupied. Request was *not* sent to Telegram. Consider adjusting the connection pool size or the pool timeout.

I have been dealing with this problem for 3 days, but I have not found a solution. I hope I have given a complete explanation.

Thank you for your time


Edit:

schedule_work function:

async def schedule_work(channel_usr):
    #time.sleep(60 * 5)
    sent_messages = {}
    start_time = datetime.datetime.now()

    while True:
        file_url = mydb.select('setting', {'section': 'channels_folder'})[1]['value']
        file_dir = mydb.select('setting', {'section': 'channels_folder'})[0]['value']
        sent_messages[channel_usr] = 0

        # datas
        channel_username = channel_usr
        channel = mydb.select('channels', {"username": f"{channel_username}"})

        schedule_channel = mydb.select('schedule', {"is_sent": 0, "channel": channel_username})

        for post in schedule_channel:
            sending_type = channel[0]['sending_type']
            sleep_status = channel[0]['sleep_status']
            sleep_time = channel[0]['sleep_time']
            channel_id = post['channel_id']
            channel_admin = post['channel_admin']
            post_type = post['type']
            sleep_inquiry = is_sleep(sleep_time)
            if sleep_status != '✅ فعال' or sleep_inquiry == False:
                if post_type == 'music':
                    # datas
                    post_name = post['post_name']
                    mp3_file = str(post['mp3_file'])
                    mp3_name = str(post['mp3_file']).split('/')[-1]
                    mp3_file_128 = f'{file_dir}/{channel_username}/' + str(post['mp3_file']).split('/')[-1].split('.')[0] + ' [128].mp3'
                    voice = str(post['voice'])
                    cover_file = str(post['cover_file'])
                    cover_name = str(post['cover_file']).split('/')[-1]
                    caption = post['caption']
                    music_file_cap = ""
                    if post['file_cap']:
                        music_file_cap = post['file_cap']
                    voice_file_cap = ""
                    if post['voice_cap']:
                        voice_file_cap = post['voice_cap']

                    if sending_type == 'automatic':
                        sent_messages[channel_username] += 3
                        # sending post
                        try:
                            await bot.send_photo(channel_id, f"{file_url}/{channel_username}/{cover_name}", caption=caption)
                            time.sleep(0.3)
                            await bot.send_voice(channel_id, open(voice, 'rb'), caption=voice_file_cap)
                            time.sleep(0.3)
                            await bot.send_audio(channel_id, f"{file_url}/{channel_username}/{mp3_name}", caption=music_file_cap)
                            time.sleep(0.3)
                            mydb.update('schedule', {'is_sent': 1, 'sent_date': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, {'id': post["id"]})

                            # Deleting sent files
                            os.system(f'rm -rf "{mp3_file}" "{mp3_file_128}"')
                            os.system(f'rm -rf "{voice}" "{cover_file}"')
                        except telegram.error.RetryAfter as e:
                            log_writer(format_exc())
                            time.sleep(int(e.retry_after))
                        except telegram.error.TimedOut:
                            log_writer(format_exc())
                            time.sleep(60)
                        except:
                            log_writer(format_exc())
                            await bot.send_message(log_channel, f'''Error in sending post at music automatic section:
                            
{format_exc()}''')
                    else:
                        if not post['admin_received']:
                            sent_messages[channel_username] += 1
                            # sending post to admin for approval
                            try:
                                # Approval keyboard
                                post_approval_keyboard = [
                                    [
                                        InlineKeyboardButton('🚫 حذف', callback_data=f'delete666_{post["id"]}'),
                                        InlineKeyboardButton('✅ انتشار', callback_data=f'share666_{post["id"]}'),
                                    ],
                                    [
                                        InlineKeyboardButton('🚫 حذف همه', callback_data=f'delete_all_{channel_username}'),
                                        InlineKeyboardButton('✳️ انتشار همه', callback_data=f'share_all_{channel_username}'),
                                    ]
                                ]
                                post_approval = await bot.send_photo(channel_admin, f"{file_url}/{channel_username}/{cover_name}", caption=f"""تنظیمات نوع ارسال کانال @{channel_username} شما روی دستی تنظیم شده،

موزیک «{post_name}» توی کانال منتشر بشه؟""", reply_markup=InlineKeyboardMarkup(post_approval_keyboard))
                                time.sleep(0.3)
                                mydb.update('schedule', {'admin_received': 1, "is_confirmed": 0, "is_deleted": 0, 'approval_msg_id': post_approval['message_id']}, {'id': post["id"]}, True)
                            except:
                                log_writer(format_exc())
                                await bot.send_message(log_channel, f'''Error in sending post at music automatic section:

{format_exc()}''')

                elif post_type == 'video':
                    # bla bla bla
                
                time.sleep(0.7)
            end_time = datetime.datetime.now()
            finish_time = end_time - start_time
            print()
            print()
            print(channel_username, '>>>>>>>>', 'finish time:', finish_time, 'Sent message: ', sent_messages[channel_username] )
            print()
            print()
            one_minute = datetime.timedelta(minutes=1)
            if int(sent_messages[channel_username]) >= 17:
                if finish_time < one_minute:
                    print()
                    print()
                    print('********************* Sleep Mode *********************' )
                    print()
                    print()
                    time.sleep(40)
                    sent_messages[channel_username] = 0
                    start_time = datetime.datetime.now()
        time.sleep(5)

Solution

  • I see several points in your code snippets that can be improved:

    1. You manually instantiate bot = telegram.Bot(token=TOKEN) and use that as global variable. This has two downsides. Firstly, the bot object does not receive the settings on pool size and timeouts that you specify with the ApplicationBuilder. This is the root cause of the exception shown above. If you want the bot object to have customized settings for that, you'll have to use the request parameter explicity and pass the corresponding seetings to HTTPXRequest. Secondly, there is usually no need to manually build an instance of telegram.Bot because python-telegram-bot already does that for you. This instance is available as app.bot or context.bot in handler/job/error callbacks. This instance does get the settings that you specify via the builder pattern and I highly recommend to use it wherever possible
    2. Your schedule_work function is not asyncio-friendly as it makes heavy use of time.sleep. That function is blocking in the asyncio-sense. At the very least, I urge you to instead use asyncio.sleep instead. Moreover, schedule_work uses a while True loop, which often is a strong indicator for bad design. Both the while True loop and the sleeping can likely be replaced by instead scheduling callbacks to run at later time. Note that you already are using APSchedulers AsyncIOScheduler, so you apparently are aware of this possibiliy already ;)
    3. You use AsyncIOScheduler manually. However, PTB already provides a wrapper around the APScheduler library via the JobQueue. Using this wrapper has the benefit that it ties in well with PTBs infrastructure such that e.g. you already have context.bot available - such that you do not need a manually defined global bot variable :)

    Disclaimer: I'm currently the maintainer of python-telegram-bot.