Search code examples
python-asyncioevent-loop

Iterate through asyncio loop


I am very new with aiohttp and asyncio so apologies for my ignorance up front. I am having difficulties with the event loop portion of the documentation and don't think my below code is executing asynchronously. I am trying to take the output of all combinations of two lists via itertools, and POST to XML. A more full blown version is listed here while using the requests module, however that is not ideal as I am needing to POST 1000+ requests potentially at a time. Here is a sample of how it looks now:

import aiohttp
import asyncio
import itertools

skillid = ['7715','7735','7736','7737','7738','7739','7740','7741','7742','7743','7744','7745','7746','7747','7748' ,'7749','7750','7751','7752','7753','7754','7755','7756','7757','7758','7759','7760','7761','7762','7763','7764','7765','7766','7767','7768','7769','7770','7771','7772','7773','7774','7775','7776','7777','7778','7779','7780','7781','7782','7783','7784']

agent= ['5124','5315','5331','5764','6049','6076','6192','6323','6669','7690','7716']

url = 'https://url'

user = 'user'
password = 'pass'
headers = {
        'Content-Type': 'application/xml'
      }

async def main():
    async with aiohttp.ClientSession() as session:
        for x in itertools.product(agent,skillid):
            payload = "<operation><operationType>update</operationType><refURLs><refURL>/unifiedconfig/config/agent/" + x[0] + "</refURL></refURLs><changeSet><agent><skillGroupsRemoved><skillGroup><refURL>/unifiedconfig/config/skillgroup/" + x[1] + "</refURL></skillGroup></skillGroupsRemoved></agent></changeSet></operation>"
        async with session.post(url,auth=aiohttp.BasicAuth(user, password), data=payload,headers=headers) as resp:
            print(resp.status)
            print(await resp.text())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

I see that coroutines can be used but not sure that applies as there is only a single task to execute. Any clarification is appreciated.


Solution

  • Because you're making a request and then immediately await-ing on it, you are only making one request at a time. If you want to parallelize everything, you need to separate making the request from waiting for the response, and you need to use something like asyncio.gather to wait for the requests in bulk.

    In the following example, I've modified your code to connect to a local httpbin instance for testing; I'm making requests to the /delay/<value> endpoint so that each requests takes a random amount of time to complete.

    The theory of operation here is:

    • Move the request code into the asynchronous one_request function, which we use to build an array of tasks.

    • Use asyncio.gather to run all the tasks at once.

    • The one_request functions returns a (agent, skillid, response) tuple, so that when we iterate over the responses we can tell which combination of parameters resulted in the given response.

    import aiohttp
    import asyncio
    import itertools
    import random
    
    skillid = [
        "7715", "7735", "7736", "7737", "7738", "7739", "7740", "7741", "7742",
        "7743", "7744", "7745", "7746", "7747", "7748", "7749", "7750", "7751",
        "7752", "7753", "7754", "7755", "7756", "7757", "7758", "7759", "7760",
        "7761", "7762", "7763", "7764", "7765", "7766", "7767", "7768", "7769",
        "7770", "7771", "7772", "7773", "7774", "7775", "7776", "7777", "7778",
        "7779", "7780", "7781", "7782", "7783", "7784",
    ]
    
    agent = [
        "5124", "5315", "5331", "5764", "6049", "6076", "6192", "6323", "6669",
        "7690", "7716",
    ]
    
    user = 'user'
    password = 'pass'
    headers = {
            'Content-Type': 'application/xml'
          }
    
    
    async def one_request(session, agent, skillid):
        # I'm setting `url` here because I want a random parameter for
        # reach request. You would probably just set this once globally.
        delay = random.randint(0, 10)
        url = f'http://localhost:8787/delay/{delay}'
    
        payload = (
            "<operation>"
            "<operationType>update</operationType>"
            "<refURLs>"
            f"<refURL>/unifiedconfig/config/agent/{agent}</refURL>"
            "</refURLs>"
            "<changeSet>"
            "<agent>"
            "<skillGroupsRemoved><skillGroup>"
            f"<refURL>/unifiedconfig/config/skillgroup/{skillid}</refURL>"
            "</skillGroup></skillGroupsRemoved>"
            "</agent>"
            "</changeSet>"
            "</operation>"
        )
    
        # This shows when the task actually executes.
        print('req', agent, skillid)
    
        async with session.post(
                url, auth=aiohttp.BasicAuth(user, password),
                data=payload, headers=headers) as resp:
            return (agent, skillid, await resp.text())
    
    
    async def main():
        tasks = []
        async with aiohttp.ClientSession() as session:
            # Add tasks to the `tasks` array
            for x in itertools.product(agent, skillid):
                task = asyncio.ensure_future(one_request(session, x[0], x[1]))
                tasks.append(task)
    
            print(f'making {len(tasks)} requests')
    
            # Run all the tasks and wait for them to complete. Return
            # values will end up in the `responses` list.
            responses = await asyncio.gather(*tasks)
    
            # Just print everything out.
            print(responses)
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    

    The above code results in about 561 requests, and runs in about 30 seconds with the random delay I've introduced.

    This code runs all the requests at once. If you wanted to limit the maximum number of concurrent requests, you could introduce a Semaphore to make one_request block if there were too many active requests.

    If you wanted to process responses as they arrived, rather than waiting for everything to complete, you could investigate the asyncio.wait method instead.