I have a very simple telegram bot and want to send an automated message to my bot users every 1 hour for example.
The problem Is that when I run celery -A main worker --loglevel=WARNING -B
scheduled tasks is not running but when I remove telegram_bot.runpolling()
it works exactly as it expected. Any idea how I can fix this issue?
import os
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
print("Starting App")
from reminders import get_reminder, add_reminder
from celery import Celery
from time import sleep
app = Celery('main', broker='redis://localhost')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('printer') every 10 seconds.
sender.add_periodic_task(10.0, printer.s(), name='add every 10')
@app.task
def printer():
print("Celery Task Scheduler")
async def add_task(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
add_reminder.add_for_day(' '.join(update.message.text.split()[1:-1]), update.message.text.split()[-1])
await update.message.reply_text("Task Added")
telegram_app = ApplicationBuilder().token(os.environ['TOKEN']).build()
telegram_app.add_handler(CommandHandler("add", add_task))
#telegram_app.run_polling()
Application.run_polling
is designed mostly for the cases where you don't have any other components besides python-telegram-bot
in your program. It uses asyncio.loop.run_forever()
which is probably why celery
doesn't get a chance to do anything.
If you want to combine PTB with celery
, you can probably try to substitute run_polling
with a custom combination of Application.{initialize, start, stop, shutdown}
and telegram_app.updater.{initialize, start_polling, stop, shutdown
(see the docs of run_polling
and the methods linked therein for more info, maybe also the source code of run_polling
).
An alternative could be to use PTBs built-in JobQueue
for scheduling the tasks instead of celery
.
Disclaimer: I'm currently the maintainer of python-telegram-bot
.