I want to measure a time spent by generator (time blocking main loop).
Lets say that I have two following generators:
async def run():
for i in range(5):
await asyncio.sleep(0.2)
yield i
return
async def walk():
for i in range(5):
time.sleep(0.3)
yield i
return
I want to measure that run
spent around 0.0s
per iteration, while walk
used at least 0.3s
.
I wanted to use something similar to this, but wasn't able to make it work for me.
Clarification:
I want to exclude the time spent in any await
section. If for any reason coroutine is halted, then I don't want to take this time into account.
So - one thing (a bit though) is to measure times in regular co-routines - I got it going with a decorator.
However, when you go a step ahead into async-generators, it is another beast - I am still trying to figure it out - it is a mix of public-exposed async-iterator methods (__anext__
, asend
, etc...) with traditional iterator methods in the objects returned, I could not yet figure out (I've just opened PEP 525 to see if I can make sense of it).
As for regular co-routines, there is a gotcha: if you make an awaitable class (my decorator), asyncio will demand its __await__
method returns something with a __next__
method, which will be called. But native Python co-routines have no __next__
: asyncio calls send()
on those - so I had to make this "transpose" in order to be able to measure the time (in co-routines).
import asyncio
import time
class ProfileCoro:
def __init__(self, corofunc):
self.corofunc = corofunc
def __call__(self, *args, **kw):
# WARNING: not parallel execution-safe: fix by
# keeping "self.ellapsed" in a proper contextvar
self.ellapsed = 0
self.coro = self.corofunc(*args, **kw)
return self
def __await__(self):
return self
def __iter__(self):
return self
def __next__(self):
return self.send(None)
def throw(self, exc):
print(f"Arghh!, got an {exc}")
self.coro.throw(exc)
def send(self, arg):
start = time.monotonic()
try:
result = self.coro.send(arg)
except StopIteration:
duration = time.monotonic() - start
self.ellapsed += duration
print(f"Ellapsed time in execution of {self.corofunc.__name__}: {self.ellapsed:.05f}s")
raise
duration = time.monotonic() - start
self.ellapsed += duration
return result
def __repr__(self):
return f"<ProfileCoro wrapper for {self.corofunc}>"
@ProfileCoro
async def run():
for i in range(5):
await asyncio.sleep(0.2)
# yield i
return 1
@ProfileCoro
async def walk():
for i in range(5):
time.sleep(0.3)
#yield i
return 3
async def main():
await run()
await walk()
return
asyncio.run(main())
To maybe be continued, if I can figure out how to wrap the async generators.
(I think most existing profiling tools use the tooling available in the language for debuggers and tracing (enabled with sys.settrace()
: everything is just "visible" in a callback for them, and no worries about wrapping all the inner the calls made by the async machinery and the asyncio loop)
... So, here is the code instrumented to also catch the time in async-generators.
It will get the happy path - if there are complicated awaitable classes, implementing or making use of asend
, athrow
, this won't do - but for a simple async generator function plugget to an async for
statement it now works:
DISCLAIMER: there might be unused code, and even unused states in the code bellow - I went back and forth quite a bit to get it working (a lot of it because I had not attempted to the fact __anext__
had to be async itself). Nonetheless here it go:
import asyncio
import time
from functools import wraps
async def _a():
yield 1
async_generator_asend_type = type(_a().__anext__())
class ProfileCoro:
def __init__(self, corofunc):
self.corofunc = corofunc
self.measures = 0
def measure_ellapsed(func):
@wraps(func)
def wrapper(self, *args, **kw):
self.measures += 1
if self.measures > 1:
try:
return func(self, *args, **kw)
finally:
self.measures -= 1
start = time.monotonic()
try:
result = func(self, *args, **kw)
except (StopIteration, StopAsyncIteration):
self.ellapsed += time.monotonic() - start
#self.print_ellapsed()
raise
finally:
self.measures -= 1
self.ellapsed += time.monotonic() - start
return result
return wrapper
def print_ellapsed(self):
name = getattr(self.corofunc, "__name__", "inner_iterator")
print(f"Ellapsed time in execution of {name}: {self.ellapsed:.05f}s")
def __call__(self, *args, **kw):
# WARNING: not parallel execution-safe: fix by
# keeping "self.ellapsed" in a proper contextvar
self.ellapsed = 0
self.measures = 0
if not isinstance(self.corofunc, async_generator_asend_type):
self.coro = self.corofunc(*args, **kw)
else:
self.coro = self.corofunc
return self
def __await__(self):
return self
def __iter__(self):
return self
@measure_ellapsed
def __next__(self):
target = self.coro
if hasattr(target, "__next__"):
return target.__next__()
elif hasattr(target, "send"):
return target.send(None)
async def athrow(self, exc):
print(f"Arghh!, got an async-iter-mode {exc}")
return await self.async_iter.athrow(exc)
def throw(self, exc):
print(f"Arghh!, got an {exc}")
self.coro.throw(exc)
@measure_ellapsed
def send(self, arg):
return self.coro.send(arg)
def __aiter__(self):
return self
#async def asend(self, value):
...
async def aclose(self):
return await self.async_iter.close()
def close(self):
return self.async_iter.close()
async def __anext__(self):
if not hasattr(self, "async_iter"):
self.async_iter = aiter(self.coro)
self.inner_profiler = ProfileCoro(self.async_iter.__anext__())
#start = time.monotonic()
try:
result = await self.inner_profiler()
except StopAsyncIteration:
#self.print_ellapsed()
raise
finally:
self.ellapsed += self.inner_profiler.ellapsed
return result
def __repr__(self):
return f"<ProfileCoro wrapper for {self.corofunc}>"
@ProfileCoro
async def run():
for i in range(5):
await asyncio.sleep(0.05)
# yield i
return 1
@ProfileCoro
async def walk():
for i in range(5):
time.sleep(0.05)
#yield i
return 3
@ProfileCoro
async def irun():
for i in range(5):
await asyncio.sleep(0.05)
yield i
@ProfileCoro
async def iwalk():
for i in range(5):
time.sleep(0.05)
yield i
async def main():
await run()
run.print_ellapsed()
await walk()
walk.print_ellapsed()
async for _ in irun():
print(".", end="", flush=True)
irun.print_ellapsed()
async for _ in iwalk():
pass
iwalk.print_ellapsed()
return
asyncio.run(main())