Search code examples
pythonpython-3.xpython-asynciogenerator

How can time spent in asynchronous generators be measured?


I want to measure a time spent by generator (time blocking main loop).

Lets say that I have two following generators:

async def run():
    for i in range(5):
        await asyncio.sleep(0.2)
        yield i
    return

async def walk():
    for i in range(5):
        time.sleep(0.3)
        yield i
    return

I want to measure that run spent around 0.0s per iteration, while walk used at least 0.3s.

I wanted to use something similar to this, but wasn't able to make it work for me.

Clarification:

I want to exclude the time spent in any await section. If for any reason coroutine is halted, then I don't want to take this time into account.


Solution

  • So - one thing (a bit though) is to measure times in regular co-routines - I got it going with a decorator.

    However, when you go a step ahead into async-generators, it is another beast - I am still trying to figure it out - it is a mix of public-exposed async-iterator methods (__anext__, asend, etc...) with traditional iterator methods in the objects returned, I could not yet figure out (I've just opened PEP 525 to see if I can make sense of it).

    As for regular co-routines, there is a gotcha: if you make an awaitable class (my decorator), asyncio will demand its __await__ method returns something with a __next__ method, which will be called. But native Python co-routines have no __next__: asyncio calls send() on those - so I had to make this "transpose" in order to be able to measure the time (in co-routines).

    import asyncio
    import time
    
    
    class ProfileCoro:
        def __init__(self, corofunc):
            self.corofunc = corofunc
    
        def __call__(self, *args, **kw):
            # WARNING: not parallel execution-safe: fix by
            # keeping "self.ellapsed" in a proper contextvar
            self.ellapsed = 0
            self.coro = self.corofunc(*args, **kw)
            return self
    
        def __await__(self):
            return self
    
        def __iter__(self):
            return self
    
        def __next__(self):
            return self.send(None)
    
        def throw(self, exc):
            print(f"Arghh!, got an {exc}")
            self.coro.throw(exc)
    
        def send(self, arg):
            start = time.monotonic()
            try:
                result = self.coro.send(arg)
            except StopIteration:
                duration = time.monotonic() - start
                self.ellapsed += duration
                print(f"Ellapsed time in execution of {self.corofunc.__name__}: {self.ellapsed:.05f}s")
                raise
            duration = time.monotonic() - start
            self.ellapsed += duration
            return result
    
        def __repr__(self):
            return f"<ProfileCoro wrapper for {self.corofunc}>"
    
    @ProfileCoro
    async def run():
        for i in range(5):
            await asyncio.sleep(0.2)
            # yield i
        return 1
    
    @ProfileCoro
    async def walk():
        for i in range(5):
            time.sleep(0.3)
            #yield i
        return 3
    
    
    async def main():
        await run()
        await walk()
        return
    
    asyncio.run(main())
    

    To maybe be continued, if I can figure out how to wrap the async generators.

    (I think most existing profiling tools use the tooling available in the language for debuggers and tracing (enabled with sys.settrace() : everything is just "visible" in a callback for them, and no worries about wrapping all the inner the calls made by the async machinery and the asyncio loop)

    ... So, here is the code instrumented to also catch the time in async-generators.

    It will get the happy path - if there are complicated awaitable classes, implementing or making use of asend, athrow, this won't do - but for a simple async generator function plugget to an async for statement it now works:

    DISCLAIMER: there might be unused code, and even unused states in the code bellow - I went back and forth quite a bit to get it working (a lot of it because I had not attempted to the fact __anext__ had to be async itself). Nonetheless here it go:

    import asyncio
    import time
    
    from functools import wraps
    
    
    async def _a():
        yield 1
    
    async_generator_asend_type = type(_a().__anext__())
    
    
    class ProfileCoro:
        def __init__(self, corofunc):
            self.corofunc = corofunc
            self.measures = 0
    
        def measure_ellapsed(func):
            @wraps(func)
            def wrapper(self, *args, **kw):
                self.measures += 1
                if self.measures > 1:
                    try:
                        return func(self, *args, **kw)
                    finally:
                        self.measures -= 1
                start = time.monotonic()
                try:
                    result = func(self, *args, **kw)
                except (StopIteration, StopAsyncIteration):
                    self.ellapsed += time.monotonic() - start
                    #self.print_ellapsed()
                    raise
                finally:
                    self.measures -= 1
                self.ellapsed += time.monotonic() - start
    
                return result
            return wrapper
    
        def print_ellapsed(self):
            name = getattr(self.corofunc, "__name__", "inner_iterator")
            print(f"Ellapsed time in execution of {name}: {self.ellapsed:.05f}s")
    
        def __call__(self, *args, **kw):
            # WARNING: not parallel execution-safe: fix by
            # keeping "self.ellapsed" in a proper contextvar
            self.ellapsed = 0
            self.measures = 0
            if not isinstance(self.corofunc, async_generator_asend_type):
                self.coro = self.corofunc(*args, **kw)
            else:
                self.coro = self.corofunc
            return self
    
    
        def __await__(self):
            return self
    
        def __iter__(self):
            return self
    
        @measure_ellapsed
        def __next__(self):
            target = self.coro
            if hasattr(target, "__next__"):
                return target.__next__()
            elif hasattr(target, "send"):
                return target.send(None)
    
        async def athrow(self, exc):
            print(f"Arghh!, got an async-iter-mode {exc}")
            return await self.async_iter.athrow(exc)
    
        def throw(self, exc):
            print(f"Arghh!, got an {exc}")
            self.coro.throw(exc)
    
        @measure_ellapsed
        def send(self, arg):
            return self.coro.send(arg)
    
        def __aiter__(self):
            return self
    
        #async def asend(self, value):
            ...
    
        async def aclose(self):
            return await self.async_iter.close()
    
        def close(self):
            return self.async_iter.close()
    
        async def __anext__(self):
            if not hasattr(self, "async_iter"):
                self.async_iter = aiter(self.coro)
            self.inner_profiler = ProfileCoro(self.async_iter.__anext__())
            #start = time.monotonic()
            try:
                result = await self.inner_profiler()
            except StopAsyncIteration:
                #self.print_ellapsed()
                raise
            finally:
                self.ellapsed += self.inner_profiler.ellapsed
            return result
    
        def __repr__(self):
            return f"<ProfileCoro wrapper for {self.corofunc}>"
    
    @ProfileCoro
    async def run():
        for i in range(5):
            await asyncio.sleep(0.05)
            # yield i
        return 1
    
    @ProfileCoro
    async def walk():
        for i in range(5):
            time.sleep(0.05)
            #yield i
        return 3
    
    
    @ProfileCoro
    async def irun():
        for i in range(5):
            await asyncio.sleep(0.05)
            yield i
    
    @ProfileCoro
    async def iwalk():
        for i in range(5):
            time.sleep(0.05)
            yield i
    
    
    async def main():
        await run()
        run.print_ellapsed()
        await walk()
        walk.print_ellapsed()
        async for _ in irun():
            print(".", end="", flush=True)
        irun.print_ellapsed()
        async for _ in iwalk():
            pass
        iwalk.print_ellapsed()
        return
    
    asyncio.run(main())