The logic in the code is to pull data via (async) HTTP request and then build a large list of dictionaries where one of the values is randomly generated:
import asyncio
import random
import string
import time
from concurrent.futures import ProcessPoolExecutor
from itertools import cycle
from httpx import AsyncClient
URL = 'http://localhost:8080'
COUNT = 1_000_000
def rand_str(length=10):
return ''.join(random.choice(string.ascii_uppercase) for i in range(length))
def parser(data, count):
items = []
for _, item in zip(range(count), cycle(data)):
item['instance'] = rand_str()
items.append(item)
return items
async def parser_coro(data, count):
items = []
for _, item in zip(range(count), cycle(data)):
item['instance'] = rand_str()
items.append(item)
return items
async def run_in_executor(func, pool, *args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(pool, func, *args, **kwargs)
async def main():
async with AsyncClient(base_url=URL) as client:
r = await client.get('/api/alerts/')
data = r.json()
# Case 1
t1 = time.perf_counter()
parser(data, COUNT)
t2 = time.perf_counter()
print(f'Case 1 - sync: {t2 - t1:.3f}s')
# Case 2
t1 = time.perf_counter()
await parser_coro(data, COUNT)
t2 = time.perf_counter()
print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')
# Case 3
t1 = time.perf_counter()
await run_in_executor(parser, None, data, COUNT)
t2 = time.perf_counter()
print(f'Case 3 - thread executor: {t2 - t1:.3f}s')
# Case 4
t1 = time.perf_counter()
with ProcessPoolExecutor() as executor:
await run_in_executor(parser, executor, data, COUNT)
t2 = time.perf_counter()
print(f'Case 4 - process executor: {t2 - t1:.3f}s')
if __name__ == '__main__':
asyncio.run(main(), debug=True)
Test:
$ python test.py
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s
Question:
Should I run the parser
function in the executor so it's not blocking the main thread while the list is generated or it won't help in this case? Is this actually a CPU or I/O bound workload in this case? I guess there is no any IO but is building a list a CPU intensive task so the workload is CPU bound?
Should I run the
parser
function in the executor so it's not blocking the main thread while the list is generated or it won't help in this case?
Yes, you should. Despite the global interpreter lock, using a separate thread will help because Python will allow execution to switch from parsing to the asyncio thread without the parser
being aware of it. Using a thread will thus prevent the event loop from being blocked for 6 seconds, or however long it takes to run the function.
Note that the parser_coro
variant is no different than the parser
variant without executor because it doesn't await anything. await parser_coro(...)
will halt the vent loop just like an executor-less call to parser(...)
.
Is this actually a CPU or I/O bound workload in this case?
I can't comment the rest of the workload, but the function as written is definitely CPU-bound.
Can I run this in the
ThreadPoolExecutor
so it's not blocking or it must be aProcessPoolExecutor
as it's a CPU-bound function?
You can run it in ThreadPoolExecutor
, sure. It's just that if you have a bunch of those running in parallel, they will all share the same CPU core. (But they won't block other coroutines because they will run off the event-loop thread.)