Search code examples
pythonasync-awaitscheduleaiohttpaiogram

How to run an async function through the task scheduler (aioschedule)?


I am writing a personal telegram bot, one of its functions is to show the balance of my accounts market.csgo.com. My code:

import asyncio
import aiogram
import aiohttp

...

async def get_balance(session, profiles_dict, message):
    async with session.get(f'https://market.csgo.com/api/v2/get-money?key={profiles_dict[1][1]}') as resp:
        html = await resp.json()

        each_wallet = int(html['money'])

        await bot.send_message(message.from_user.id,
            f'🟢 <a href="{profiles_dict[1][0]}">{profiles_dict[0]}</a> : <i>{each_wallet}</i>',
            disable_web_page_preview=True, parse_mode=types.ParseMode.HTML)

...

@dp.message_handler(content_types=['text'])
async def main(message):
    profiles = users()

    async with aiohttp.ClientSession(trust_env=True) as session:
        tasks = []

        if message.text == 'Balance 💸':
            await bot.send_message(message.from_user.id, 'Information request. Wait..')

            for i in profiles.items():
                task = asyncio.ensure_future(get_balance(session, i, message, stats))
                tasks.append(task)
            await asyncio.gather(*tasks)

        if message.text == 'On Sale 💰':
            ...

        if message.text == 'Timeout Items ⌛':
            ...


executor.start_polling(dp, skip_updates=False)

get_balance() works in async mode, sends aiohttp requests to the API and outputs information await bot.send_message(). Result:

enter image description here

Now the launch of the function is implemented through the keyboard button, but how to make the function run every hour? I am aware of the existence of asynchronous task scheduler aioschedule and have seen this example. But they run a function without arguments, but I have as many as 3 of them async def get_balance(session, profiles_dict, message). I tried to do this:

import asyncio
import aioschedule

async def scheduler(session, profiles_dict, message):
    aioschedule.every().hour.do(get_balance(session, profiles_dict, message))
    while True:
        await aioschedule.run_pending()
        await asyncio.sleep(1)


async def on_startup(session, profiles_dict, message):
    asyncio.create_task(scheduler(session, profiles_dict, message))


if __name__ == '__main__':
    executor.start_polling(dp, skip_updates=False, on_startup=on_startup(session, profiles_dict, message))

Obviously it doesn't work that way. enter image description here

My question is:

How to run an async function with arguments that sends aiohttp requests through task scheduling aioschedule and display the result through telegram aiogram?


Solution

  • Solution:

    import aiogram
    import asyncio
    import aiohttp
    import aioschedule
    
    ...
    
    async def get_balance(session, profiles_dict):
        async with session.get(f'https://market.csgo.com/api/v2/get-money?key={profiles_dict[1][1]}') as resp:
            html = await resp.json()
    
            each_wallet = int(html['money'])
    
            await bot.send_message(MY_TELEGRAM_ID,
                f'🟢 <a href="{profiles_dict[1][0]}">{profiles_dict[0]}</a> : <i>{each_wallet}</i>',
                disable_web_page_preview=True, parse_mode=types.ParseMode.HTML)
    
    ...
    
    @dp.message_handler(content_types=['text'])
    async def main(message):
        profiles = users()
    
        async with aiohttp.ClientSession(trust_env=True) as session:
            tasks = []
    
            if message.text == 'Balance 💸':
                await bot.send_message(message.from_user.id, 'Information request. Wait..')
    
                for i in profiles.items():
                    task = asyncio.ensure_future(get_balance(session, i))
                    tasks.append(task)
                await asyncio.gather(*tasks)
    
            if message.text == 'On Sale 💰':
                ...
    
            if message.text == 'Timeout Items ⌛':
                ...
    
    
    # Client session get_balance function
    async def session_get_balance():
        profiles = users()
    
        async with aiohttp.ClientSession(trust_env=True) as session:
            tasks = []
    
            for i in profiles.items():
                task = asyncio.ensure_future(get_balance(session, i))
                tasks.append(task)
            await asyncio.gather(*tasks)
    
    
    # Schedule functions by time
    async def scheduler():
        aioschedule.every().hour.do(session_get_balance)
        while True:
            await aioschedule.run_pending()
            await asyncio.sleep(1)
    
    
    # Function at start
    async def on_startup(_):
        asyncio.create_task(scheduler())
    
    
    # Launch telegram bot
    if __name__ == "__main__":
        executor.start_polling(dp, skip_updates=True, on_startup=on_startup)
    

    Since this is my personal bot, instead of message.from_user.id I specified my MY_TELEGRAM_ID.

    await bot.send_message(MY_TELEGRAM_ID,
                f'🟢 <a href="{profiles_dict[1][0]}">{profiles_dict[0]}</a> : <i>{each_wallet}</i>',
                disable_web_page_preview=True, parse_mode=types.ParseMode.HTML)