Search code examples
taskpython-asynciopython-3.7

Asyncio start a task inside another task?


i'm trying to learn the usage of asyncio but i've met a roadblock.

What am i trying to do? I'm trying to create a number of workers that as soon as they're created they start their own task. So while task3 is being created and started task1 should already be executing its task. I'm doing that by using a loop inside a single coroutine, at eache iteration the worker is created and starts.

The problem i'm facing: When the first worker completes its task the others just stop and don't continue.

This is my code:

import asyncio

class Worker:
    def __init__(self, session_name):
        self.name = session_name
        self.messagelist = ['--------1', '--------2', '--------3', '--------4']

    async def job(self):
        for i, message in enumerate(self.messagelist):
            print(f"### Worker {self.name} says {message}")
            await asyncio.sleep(20)

class Testmanager:
    def __init__(self):
        self.workers_name = ['test0', 'test1', 'test2', 'test3', 'test4']

    async def create_and_start_workers(self, loop):
        for i, name in enumerate(self.workers_name):
            worker = Worker(name)
            print(f"# Created worker {worker.name}")
            loop.create_task(worker.job())
            print(f"## Started worker {worker.name}")
            await asyncio.sleep(10)

    def start(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.create_and_start_workers(loop))
        loop.close()

manager = Testmanager()
manager.start()

When run initially it works as expected, but after a while i get a lot of:

Task was destroyed but it is pending!
task: <Task pending coro=<Worker.job() done, defined at PATH_REDACTED> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000026AF6315438>()]>>

What am i doing wrong?

Thanks for the help.


Solution

  • What am i doing wrong?

    You are never awaiting the tasks you create to run in parallel. For example:

        async def create_and_start_workers(self, loop):
            tasks = []
            for i, name in enumerate(self.workers_name):
                worker = Worker(name)
                print(f"# Created worker {worker.name}")
                tasks.append(loop.create_task(worker.job()))
                print(f"## Started worker {worker.name}")
                await asyncio.sleep(10)
            await asyncio.gather(*tasks)