Search code examples
pythonpython-asynciopython-multithreading

Cannot trigger an async function from another threaded function in Python


I am making a discord bot that will grab a json using requests from time to time, and then send the relevant information to a specific channel.

I have the following classes:

  • Helper, which is the discord bot itself, that runs async from the start, inside an asyncio.gather;
  • tasker that controls the interval which calls the class that will do the requests. It runs in a different thread so it doesn't stop the async Helper while it waits
  • getInfo that does the requests, store the info and should talk with Helper

I am having 2 problems right now:

While the tasker is on a different thread, every time I try to talk with Helper via getInfo it gives me the errors RuntimeError: no running event loop and RuntimeWarning: coroutine 'getInfo.discordmsg' was never awaited

If I dont run it on a different thread, however, it does work on the TestStatus: 1 but it makes Helper get stuck and stop running with TestStatus: 2

Anyway, here is the code

import requests
import asyncio
import discord
from discord.ext import commands, tasks
from datetime import datetime, timedelta
import threading

class Helper(discord.Client):
    async def on_ready(self):
        global discordbot, taskervar
        servername = 'ServerName'
        discordbot = self
        self.servidores = dict()
        self.canais = dict()
        for i in range(len(self.guilds)):
            self.servidores[self.guilds[i].name] = {}
            self.servidores[self.guilds[i].name]['guild']=self.guilds[i]
            servidor = self.guilds[i]
            for k in range(len(servidor.channels)):
                canal = servidor.channels[k]
                self.canais[str(canal.name)] = canal
            if 'bottalk' not in self.canais.keys():
                newchan = await self.servidores[self.guilds[i].name]['guild'].create_text_channel('bottalk')
                self.canais[str(newchan.name)] = newchan
            self.servidores[self.guilds[i].name]['canais'] = self.canais
        self.bottalk = self.get_channel(self.servidores[servername]['canais']['bottalk'].id)
        await self.msg("Bot online: " + converteHora(datetime.now(),True))
        print(f'{self.user} has connected to Discord!')
        taskervar.startprocess()
    async def msg(self, msg):
        await self.bottalk.send(msg)
    async def on_message(self, message):
        if message.author == self.user:
            return
        else:
            print(message)

class tasker:
    def __init__(self):
        global discordbot, taskervar
        print('Tasker start')
        taskervar = self
        self.waiter = threading.Event()
        self.lastupdate = datetime.now()
        self.nextupdate = datetime.now()
        self.thread = threading.Thread(target=self.requests)
    def startprocess(self):
        if not self.thread.is_alive():
            self.waiter = threading.Event()
            self.interval = 60*5
            self.thread = threading.Thread(target=self.requests)
            self.thread.start()
    def requests(self):
        while not self.waiter.is_set():
            getInfo()
            self.lastupdate = datetime.now()
            self.nextupdate = datetime.now()+timedelta(seconds=self.interval)
            self.waiter.wait(self.interval)
    def stopprocess(self):
        self.waiter.set()

class getInfo:
    def __init__(self):
        global discordbot, taskervar
        self.requests()
    async def discordmsg(self,msg):
        await discordbot.msg(msg)
    def requests(self):
        jsondata = {"TestStatus": 1}
        if  jsondata['TestStatus'] == 1:
            print('here')
            asyncio.create_task(self.discordmsg("SOMETHING WENT WRONG"))
            taskervar.stopprocess()
            return
        elif jsondata['TestStatus'] == 2:
            print('test')
            hora = converteHora(datetime.now(),True)
            asyncio.create_task(self.discordmsg(str("Everything is fine but not now: " + hora )))
            print('test2')

            
def converteHora(dateUTC, current=False):
    if current:
        response = (dateUTC.strftime("%d/%m/%Y, %H:%M:%S"))
    else:
        response = (dateutil.parser.isoparse(dateUTC)-timedelta(hours=3)).strftime("%d/%m/%Y, %H:%M:%S")
    return response

async def main():
    TOKEN = 'TOKEN GOES HERE'
    tasker()
    await asyncio.gather(
        await Helper().start(TOKEN)
    )


if __name__ == '__main__':
    asyncio.run(main())

Solution

  • Your primary problem is you don't give your secondary thread access to the asyncio event loop. You can't just await and/or create_task a coroutine on a global object (One of many reasons to avoid using global objects in the first place). Here is how you could modify your code to accomplish that:

    class tasker: 
       def __init__(self):
          # ...
          self.loop = asyncio.get_running_loop()
        # ...
    
    class getInfo:
        #...
        def requests(self):
            # replace the create_tasks calls with this. 
            asyncio.run_coroutine_threadsafe(self.discordmsg, taskervar.loop)
    

    This uses your global variables because I don't want to rewrite your entire program, but I still strongly recommend avoiding them and considering a re-write yourself.

    All that being said, I suspect you will still have this bug:

    If I dont run it on a different thread, however, it does work on the TestStatus: 1 but it makes Helper get stuck and stop running with TestStatus: 2

    I can't tell what would cause this issue and I'm running into trouble reproducing this on my machine. Your code is pretty hard to read and is missing some details for reproducibility. I would imagine that is part of the reason why you didn't get an answer in the first place. I'm sure you're aware of this article but might be worth a re-visit for better practices in sharing code. https://stackoverflow.com/help/minimal-reproducible-example