Search code examples
pythonpython-3.xpython-requestspython-asyncioaiohttp

Why asyncio's run_in_executor gives so little parallelization when making HTTP requests?


I've written a benchmark utility that batch queries a REST endpoint. It does it in three ways:

  1. sequentially, using the requests library,
  2. concurrently, using the requests library, but wrapping each request with loop.run_in_executor(),
  3. concurrently, using the aiohttp library.

Below are the results with different batch sizes:

batch_size=16

       concur_times  seq_times  concur_aiohttp_times
count     50.000000  50.000000             50.000000
mean       0.123786   0.235883              0.087843
std        0.009733   0.018039              0.029977
min        0.108682   0.210515              0.071560
25%        0.118666   0.222436              0.075565
50%        0.121978   0.231876              0.080050
75%        0.125740   0.242939              0.086345
max        0.169194   0.283809              0.267874

batch_size=4

       concur_times  seq_times  concur_aiohttp_times
count     50.000000  50.000000             50.000000
mean       0.080764   0.091276              0.052807
std        0.008342   0.016509              0.033814
min        0.069541   0.078517              0.041993
25%        0.076142   0.082242              0.044563
50%        0.079046   0.085540              0.045735
75%        0.081645   0.092659              0.049428
max        0.111622   0.170785              0.281397

As the results show, the aiohttp routine is consistently more parallel by a wide margin. What's more, with a small batch size (4), the 2nd approach of using loop.run_in_executor ("concur_times" column) achieves only a speed up of 1/9 v. the sequential method.

Why is that? Is my code at fault? I'm including it below.

I've tried swapping out the network IO for sleep and asyncio.sleep and that produced expected results of methods 2 and 3 being equally fast and method 1 being batch_size times slower.

Code:

import asyncio
import requests
from cytoolz.curried import *
import pandas as pd
from timeit import default_timer as now

url = 'https://jsonplaceholder.typicode.com/todos/'

def dl_todo_with_requests(session, n):
        response = session.get(url + str(n))
        assert(response.status_code == 200)
        text = response.text
        return text

dl_todo_with_requests = curry(dl_todo_with_requests)

def seq_dl(todos_to_get):
        with requests.Session() as session:
                todos = pipe(
                        todos_to_get,
                        map( dl_todo_with_requests(session) ),
                        list )
                return todos

get_todos_from_futures = lambda futures: \
        pipe( futures,
                map( lambda fut: fut.result() ),
                list
            )

async def concur_dl(todos_to_get):
        loop = asyncio.get_running_loop()
        with requests.Session() as session:
                completed_futures, _pending = await \
                        pipe(
                        todos_to_get,
                        map( lambda n:
                                loop.run_in_executor(
                                None,
                                lambda: dl_todo_with_requests(session, n)
                                )),
                        list,
                        asyncio.wait
                        );
                todos = get_todos_from_futures(completed_futures)
                return todos

import aiohttp
async def concur_dl_aiohttp(todos_to_get):
        async def dl(session, todo):
                async with session.get(url + str(todo)) as resp:
                        assert(resp.status == 200)
                        return resp.text()
        dl = curry(dl)
        async with aiohttp.ClientSession() as session:
                loop = asyncio.get_running_loop()
                unexecuted = pipe(
                        todos_to_get,
                        map( dl(session) ),
                        list )
                completed_futures, _pending = await asyncio.wait(unexecuted)
                todos = get_todos_from_futures(completed_futures)
                return todos


def check_todos_received(todos):
        assert(len(todos) == len(todos_to_get))
        todo_has_content = lambda todo: len(todo) > len('{}')
        assert(all(map(todo_has_content, todos)))
        return True

def measure_it(f):
        start = now();
        f()
        elapsed = now() - start
        return elapsed

inspect = lambda f, it: map(do(f), it)
inspect = curry(inspect)

def bench(n_iters=50,batch_size=4):
        todos_to_get = range(1,batch_size+1)
        seq_dl(todos_to_get)
        # heat caches, if any
        measure_seq = lambda: pipe(
                        seq_dl(todos_to_get),
                        inspect(check_todos_received) )
        measure_concur = lambda: pipe(
                        asyncio.run(concur_dl(todos_to_get)),
                        inspect(check_todos_received) )
        measure_concur_aiohttp = lambda: pipe(
                        asyncio.run(concur_dl_aiohttp(todos_to_get)),
                        inspect(check_todos_received) )
        do_the_bench = lambda dl_f, title: \
               pipe( range(n_iters),
                       inspect(
                               lambda n: \
                               print("doing %s/%s %s batch download" \
                                       % (n+1,n_iters,title))),
                        map(lambda _: measure_it(dl_f)),
                        list )
        concur_times = do_the_bench(measure_concur,'concurrent')
        concur_aiohttp_times = do_the_bench(measure_concur_aiohttp,'concurrent_aiohttp')
        seq_times = do_the_bench(measure_seq,'sequential')
        return dict(
                concur_times=concur_times,
                seq_times=seq_times,
                concur_aiohttp_times=concur_aiohttp_times)

The benchmark is run like this: bench(n_iters=50,batch_size=4). Then pass the output through lambda output: pandas.DataFrame(output).describe() to produce the tables.


Solution

  • The default executor for asyncio's run_in_executor is ThreadPoolExecutor, which uses Python threads. So it is also affected by the GIL, as described in this thread.

    In your case only one thread with asynchronous job runs at a time, resulting aiohttp to show better performance.