Search code examples
python-3.6python-asyncioaiohttp

async code running synchronously, doesn't seem to have any lines that will be blocking


Running on Windows 10, Python 3.6.3, running inside PyCharm IDE, this code:

import asyncio
import json
import datetime
import time
from aiohttp import ClientSession


async def get_tags():
    url_tags = f"{BASE_URL}tags?access_token={token}"
    async with ClientSession() as session:
        async with session.get(url_tags) as response:
            return await response.read()


async def get_trips(vehicles):
    url_trips = f"{BASE_URL}fleet/trips?access_token={token}"
    for vehicle in vehicles:
        body_trips = {"groupId": groupid, "vehicleId": vehicle['id'], "startMs": int(start_ms), "endMs": int(end_ms)}
        async with ClientSession() as session:
            async with session.post(url_trips, json=body_trips) as response:
                yield response.read()


async def main():
    tags = await get_tags()
    tag_list = json.loads(tags.decode('utf8'))['tags']
    veh = tag_list[0]['vehicles'][0:5]
    return [await v async for v in get_trips(veh)]

t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
t2 = time.time()
print(t2 - t1)

seems to be running completely synchronously, time increases linearly as size of loop increases. Following examples from a book I read, "Using Asyncio in Python 3", the code should be asynchronous; am I missing something here? Similar code in C# completes in a few seconds with about 2,000 requests, takes about 14s to run 20 requests here (6s to run 10).

Edit:

re-wrote some code:

async def get_trips(vehicle):
    url_trips = f"{BASE_URL}fleet/trips?access_token={token}"
    #for vehicle in vehicles:
    body_trips = {"groupId": groupid, "vehicleId": vehicle['id'], "startMs": int(start_ms), "endMs": int(end_ms)}
    async with ClientSession() as session:
        async with session.post(url_trips, json=body_trips) as response:
            res = await response.read()
            return res


t1 = time.time()
loop = asyncio.new_event_loop()
x = loop.run_until_complete(get_tags())
tag_list = json.loads(x.decode('utf8'))['tags']
veh = tag_list[0]['vehicles'][0:10]
tasks = []
for v in veh:
    tasks.append(loop.create_task(get_trips(v)))

loop.run_until_complete(asyncio.wait(tasks))
t2 = time.time()
print(t2 - t1)

This is in fact running asynchronously, but now I can't use the return value from my get_trips function and I don't really see a clear way to use it. Nearly all tutorials I see just have the result being printed, which is basically useless. A little confused on how async is supposed to work in Python, and why some things with the async keyword attached to them run synchronously while others don't.

Simple question is: how do I add the return result of a task to a list or dictionary? More advanced question, could someone explain why my code in the first example runs synchronously while the code in the 2nd part runs asynchronously?

Edit 2:

replacing:

loop.run_until_complete(asyncio.wait(tasks))

with:

x = loop.run_until_complete(asyncio.gather(*tasks))

Fixes the simple problem; now just curious why the async list comprehension doesn't run asynchronously


Solution

  • now just curious why the async list comprehension doesn't run asynchronously

    Because your comprehension iterates over an async generator which produces a single task, which you then await immediately, thus killing the parallelism. That is roughly equivalent to this:

    for vehicle in vehicles:
        trips = await fetch_trips(vehicle)
        # do something with trips
    

    To make it parallel, you can use wait or gather as you've already discovered, but those are not mandatory. As soon as you create a task, it will run in parallel. For example, this should work as well:

    # step 1: create the tasks and store (task, vehicle) pairs in a list
    tasks = [(loop.create_task(get_trips(v)), v)
             for v in vehicles]
    
    # step 2: await them one by one, while others are running:
    for t, v in tasks:
        trips = await t
        # do something with trips for vehicle v