I am writing a personal telegram bot, one of its functions is to show the balance of my accounts market.csgo.com. My code:
import asyncio
import aiogram
import aiohttp
async def get_balance(session, profiles_dict, message):
async with session.get(f'https://market.csgo.com/api/v2/get-money?key={profiles_dict[1][1]}') as resp:
html = await resp.json()
each_wallet = int(html['money'])
await bot.send_message(message.from_user.id,
f'🟢 <a href="{profiles_dict[1][0]}">{profiles_dict[0]}</a> : <i>{each_wallet}</i>',
disable_web_page_preview=True, parse_mode=types.ParseMode.HTML)
async def main(message):
profiles = users()
async with aiohttp.ClientSession(trust_env=True) as session:
tasks = []
if message.text == 'Balance 💸':
await bot.send_message(message.from_user.id, 'Information request. Wait..')
for i in profiles.items():
task = asyncio.ensure_future(get_balance(session, i, message, stats))
await asyncio.gather(*tasks)
if message.text == 'On Sale 💰':
if message.text == 'Timeout Items ⌛':
executor.start_polling(dp, skip_updates=False)
works in async mode, sends aiohttp
requests to the API and outputs information await bot.send_message()
. Result:
Now the launch of the function is implemented through the keyboard button, but how to make the function run every hour? I am aware of the existence of asynchronous task scheduler aioschedule and have seen this example. But they run a function without arguments, but I have as many as 3 of them async def get_balance(session, profiles_dict, message)
. I tried to do this:
import asyncio
import aioschedule
async def scheduler(session, profiles_dict, message):
aioschedule.every().hour.do(get_balance(session, profiles_dict, message))
while True:
await aioschedule.run_pending()
await asyncio.sleep(1)
async def on_startup(session, profiles_dict, message):
asyncio.create_task(scheduler(session, profiles_dict, message))
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=False, on_startup=on_startup(session, profiles_dict, message))
Obviously it doesn't work that way.
My question is:
How to run an async function with arguments that sends aiohttp
requests through task scheduling aioschedule
and display the result through telegram aiogram
import aiogram
import asyncio
import aiohttp
import aioschedule
async def get_balance(session, profiles_dict):
async with session.get(f'https://market.csgo.com/api/v2/get-money?key={profiles_dict[1][1]}') as resp:
html = await resp.json()
each_wallet = int(html['money'])
await bot.send_message(MY_TELEGRAM_ID,
f'🟢 <a href="{profiles_dict[1][0]}">{profiles_dict[0]}</a> : <i>{each_wallet}</i>',
disable_web_page_preview=True, parse_mode=types.ParseMode.HTML)
async def main(message):
profiles = users()
async with aiohttp.ClientSession(trust_env=True) as session:
tasks = []
if message.text == 'Balance 💸':
await bot.send_message(message.from_user.id, 'Information request. Wait..')
for i in profiles.items():
task = asyncio.ensure_future(get_balance(session, i))
await asyncio.gather(*tasks)
if message.text == 'On Sale 💰':
if message.text == 'Timeout Items ⌛':
# Client session get_balance function
async def session_get_balance():
profiles = users()
async with aiohttp.ClientSession(trust_env=True) as session:
tasks = []
for i in profiles.items():
task = asyncio.ensure_future(get_balance(session, i))
await asyncio.gather(*tasks)
# Schedule functions by time
async def scheduler():
while True:
await aioschedule.run_pending()
await asyncio.sleep(1)
# Function at start
async def on_startup(_):
# Launch telegram bot
if __name__ == "__main__":
executor.start_polling(dp, skip_updates=True, on_startup=on_startup)
Since this is my personal bot, instead of message.from_user.id
I specified my MY_TELEGRAM_ID
await bot.send_message(MY_TELEGRAM_ID,
f'🟢 <a href="{profiles_dict[1][0]}">{profiles_dict[0]}</a> : <i>{each_wallet}</i>',
disable_web_page_preview=True, parse_mode=types.ParseMode.HTML)