I wrote a channel management bot with PTB. The logic of this robot is that it takes the desired posts from the internet, processes them, and then sends that post to each of its channels.
What I want to do, but I don't know how, is to have a function that creates a separate thread for each channel and refreshes it every 60 seconds so that if a new channel is added, it creates a separate thread for that too.
I already did this with threading, but I got an event loop error.
builder = ApplicationBuilder()
builder.connection_pool_size(50000)
builder.get_updates_connection_pool_size(50000)
builder.pool_timeout(100)
builder.get_updates_pool_timeout(100)
app = builder.token(TOKEN).get_updates_http_version('1.1').http_version('1.1').build()
thread_list = []
async def schedule():
channel_list = mydb.select("channels", {"visibility": 1, "active": "activate"})
for one in channel_list:
if one['username'] not in thread_list:
app.create_task(schedule_work(one['username']))
thread_list.append(one['username'])
scheduler = AsyncIOScheduler()
scheduler.add_job(schedule,'interval', seconds=60, name="main")
scheduler.start()
app.run_polling()
In the part of my code that I put here:
When I run the above code, it starts posting in the channel, but if I work with the bot at the same time, I get the following error:
.
.
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
.
.
File "/root/abaradmin/venv/lib/python3.10/site-packages/anyio/_core/_tasks.py", line 119, in __exit__
raise TimeoutError
TimeoutError
.
.
.
File "/root/abaradmin/venv/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.PoolTimeout
.
.
File "/root/abaradmin/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.PoolTimeout
.
.
File "/root/abaradmin/venv/lib/python3.10/site-packages/telegram/request/_httpxrequest.py", line 226, in do_request
raise TimedOut(
telegram.error.TimedOut: Pool timeout: All connections in the connection pool are occupied. Request was *not* sent to Telegram. Consider adjusting the connection pool size or the pool timeout.
I have been dealing with this problem for 3 days, but I have not found a solution. I hope I have given a complete explanation.
Thank you for your time
Edit:
schedule_work function:
async def schedule_work(channel_usr):
#time.sleep(60 * 5)
sent_messages = {}
start_time = datetime.datetime.now()
while True:
file_url = mydb.select('setting', {'section': 'channels_folder'})[1]['value']
file_dir = mydb.select('setting', {'section': 'channels_folder'})[0]['value']
sent_messages[channel_usr] = 0
# datas
channel_username = channel_usr
channel = mydb.select('channels', {"username": f"{channel_username}"})
schedule_channel = mydb.select('schedule', {"is_sent": 0, "channel": channel_username})
for post in schedule_channel:
sending_type = channel[0]['sending_type']
sleep_status = channel[0]['sleep_status']
sleep_time = channel[0]['sleep_time']
channel_id = post['channel_id']
channel_admin = post['channel_admin']
post_type = post['type']
sleep_inquiry = is_sleep(sleep_time)
if sleep_status != '✅ فعال' or sleep_inquiry == False:
if post_type == 'music':
# datas
post_name = post['post_name']
mp3_file = str(post['mp3_file'])
mp3_name = str(post['mp3_file']).split('/')[-1]
mp3_file_128 = f'{file_dir}/{channel_username}/' + str(post['mp3_file']).split('/')[-1].split('.')[0] + ' [128].mp3'
voice = str(post['voice'])
cover_file = str(post['cover_file'])
cover_name = str(post['cover_file']).split('/')[-1]
caption = post['caption']
music_file_cap = ""
if post['file_cap']:
music_file_cap = post['file_cap']
voice_file_cap = ""
if post['voice_cap']:
voice_file_cap = post['voice_cap']
if sending_type == 'automatic':
sent_messages[channel_username] += 3
# sending post
try:
await bot.send_photo(channel_id, f"{file_url}/{channel_username}/{cover_name}", caption=caption)
time.sleep(0.3)
await bot.send_voice(channel_id, open(voice, 'rb'), caption=voice_file_cap)
time.sleep(0.3)
await bot.send_audio(channel_id, f"{file_url}/{channel_username}/{mp3_name}", caption=music_file_cap)
time.sleep(0.3)
mydb.update('schedule', {'is_sent': 1, 'sent_date': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, {'id': post["id"]})
# Deleting sent files
os.system(f'rm -rf "{mp3_file}" "{mp3_file_128}"')
os.system(f'rm -rf "{voice}" "{cover_file}"')
except telegram.error.RetryAfter as e:
log_writer(format_exc())
time.sleep(int(e.retry_after))
except telegram.error.TimedOut:
log_writer(format_exc())
time.sleep(60)
except:
log_writer(format_exc())
await bot.send_message(log_channel, f'''Error in sending post at music automatic section:
{format_exc()}''')
else:
if not post['admin_received']:
sent_messages[channel_username] += 1
# sending post to admin for approval
try:
# Approval keyboard
post_approval_keyboard = [
[
InlineKeyboardButton('🚫 حذف', callback_data=f'delete666_{post["id"]}'),
InlineKeyboardButton('✅ انتشار', callback_data=f'share666_{post["id"]}'),
],
[
InlineKeyboardButton('🚫 حذف همه', callback_data=f'delete_all_{channel_username}'),
InlineKeyboardButton('✳️ انتشار همه', callback_data=f'share_all_{channel_username}'),
]
]
post_approval = await bot.send_photo(channel_admin, f"{file_url}/{channel_username}/{cover_name}", caption=f"""تنظیمات نوع ارسال کانال @{channel_username} شما روی دستی تنظیم شده،
موزیک «{post_name}» توی کانال منتشر بشه؟""", reply_markup=InlineKeyboardMarkup(post_approval_keyboard))
time.sleep(0.3)
mydb.update('schedule', {'admin_received': 1, "is_confirmed": 0, "is_deleted": 0, 'approval_msg_id': post_approval['message_id']}, {'id': post["id"]}, True)
except:
log_writer(format_exc())
await bot.send_message(log_channel, f'''Error in sending post at music automatic section:
{format_exc()}''')
elif post_type == 'video':
# bla bla bla
time.sleep(0.7)
end_time = datetime.datetime.now()
finish_time = end_time - start_time
print()
print()
print(channel_username, '>>>>>>>>', 'finish time:', finish_time, 'Sent message: ', sent_messages[channel_username] )
print()
print()
one_minute = datetime.timedelta(minutes=1)
if int(sent_messages[channel_username]) >= 17:
if finish_time < one_minute:
print()
print()
print('********************* Sleep Mode *********************' )
print()
print()
time.sleep(40)
sent_messages[channel_username] = 0
start_time = datetime.datetime.now()
time.sleep(5)
I see several points in your code snippets that can be improved:
bot = telegram.Bot(token=TOKEN)
and use that as global variable. This has two downsides. Firstly, the bot
object does not receive the settings on pool size and timeouts that you specify with the ApplicationBuilder
. This is the root cause of the exception shown above. If you want the bot
object to have customized settings for that, you'll have to use the request
parameter explicity and pass the corresponding seetings to HTTPXRequest
. Secondly, there is usually no need to manually build an instance of telegram.Bot
because python-telegram-bot
already does that for you. This instance is available as app.bot
or context.bot
in handler/job/error callbacks. This instance does get the settings that you specify via the builder pattern and I highly recommend to use it wherever possibleschedule_work
function is not asyncio-friendly as it makes heavy use of time.sleep
. That function is blocking in the asyncio-sense. At the very least, I urge you to instead use asyncio.sleep
instead. Moreover, schedule_work
uses a while True
loop, which often is a strong indicator for bad design. Both the while True
loop and the sleeping can likely be replaced by instead scheduling callbacks to run at later time. Note that you already are using APSchedulers AsyncIOScheduler
, so you apparently are aware of this possibiliy already ;)AsyncIOScheduler
manually. However, PTB already provides a wrapper around the APScheduler library via the JobQueue
. Using this wrapper has the benefit that it ties in well with PTBs infrastructure such that e.g. you already have context.bot
available - such that you do not need a manually defined global bot
variable :)Disclaimer: I'm currently the maintainer of python-telegram-bot
.