Search code examples
pythonpython-asynciopython-multithreading

A simple way to write an already running event_loop to add a new task


What I want to do is to new add tasks later on to a long-lasting (and preferably empty) event_loop.

I also want to end the event_loop at an arbitrary time.

This code is intended to be run on a server.

I've tried it myself

(1)Use call_soon, create_task, and ensure_future when creating tasks

(2)Use run_forever to run the task

I wrote the following code that I thought I could implement in my own way.

However, this code will do something that doesn't need to be done every 10 seconds.

Moreover i can't arbitrarily end the event_loop.

In summary, please tell me the following two points

(1)How to make the first event_loop simple and not block tasks that will be added later

(2)How to stop event_loop at any given time

import threading
import asyncio


async def long_task():
    i = 0
    while True:
        print(i)
        await asyncio.sleep(10)
        i = i+1


async def create_long_task(loop):
    t = loop.create_task(long_task())
    await t


def run_long_task():
    loop.run_until_complete(create_long_task(loop))


def new_task():
    print('----------------------------')


loop = asyncio.get_event_loop()
thread1 = threading.Thread(target=run_long_task)
thread1.start()

thread2 = threading.Thread(target=new_task)
thread2.start()

Postscript

I read the comments and rewrote it.

But I can't achieve it even with this code.

If I change "await asyncio.sleep(3)" to "time.sleep(3)" in "long_task()", "new_task()" will be blocked.

Rewriting it to "await time.sleep(3)" gives the following error.

[TypeError: object NoneType can't be used in 'await' expression]

How do I rewrite it?

What I'd like to do is to create an empty event_loop first since I don't want to add more event_loops, and then add the necessary tasks to the event_loop.

I want to start the server and have an empty loop running, and then add tasks as needed.

import threading
import asyncio


async def long_task():
   i = 0
   while True:
       print(i)
       await asyncio.sleep(10)
       i = i+1


async def create_long_task(loop):
   t = loop.create_task(long_task())
   await t


def run_long_task():
   loop.run_until_complete(create_long_task(loop))


async def new_task():
   i = 0
   while True:
       print("---{}--".format(i))
       await asyncio.sleep(10)
       i = i+1


def add_task(loop):
   t = loop.create_task(new_task())
   t


def add_thread(loop):
   thread = threading.Thread(
       target=add_task, args=(loop,))
   thread.start()
   thread.join
   print("new thread end")


loop = asyncio.get_event_loop()
thread1 = threading.Thread(target=run_long_task)
thread1.start()

add_thread(loop)

Solution

  • You can use asyncio.run_coroutine_threadsafe to send tasks to a running loop. You can start a loop by running run_forever in a separate thread, and then submit tasks to that loop, just as you wanted:

    import threading, asyncio, time
    
    def run_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    async def new_task():
        print('a')
        await asyncio.sleep(1)
        print('b')
    
    def main():
        loop = asyncio.new_event_loop()
        threading.Thread(target=run_loop, args=(loop,)).start()
    
        asyncio.run_coroutine_threadsafe(new_task(), loop)
        # sleep while the task runs in the background
        time.sleep(2)
        print('exiting')
        loop.call_soon_threadsafe(loop.stop)
    
    if __name__ == '__main__':
        main()