Search code examples
pythonfor-looppython-requestspython-asyncio

how to iterate api loop concurrently with asyncio python


My code requests two api gets and compares them. I'm iterating against a dictionary in a for loop to determine where to point the get request. Sending GET, waiting for reply, then performing math, then moving to next item in dictionary takes a long time because of the waiting for reply.

There are numerous conversations that asyncio is a solution to this, but the code chunks I see in answers don't seem to help (or I can't figure out how to apply). See here, here, here for excellent conversations on asyncio. I'm fairly certain that if I batch the requests it will speed up iteration significantly. IE send 20 requests (arbitrary), get 20 responses, compare the outputs, repeat.

Here is my non-async code, modified a bit for readability.

value_ops = {'name1':['text','ticker1','ticker2','ticker3'],
'name2':['text','ticker1','ticker2','ticker3'],
...
}

# This are the two api GET reqeusts
def pm_check(od_pm,pSide):
    #[code cunk here] output is json that I'm grabbing two values from


def ks_check(od_ks,kSide,direction):
    #[code cunk here] output is json that I'm grabbing two values from

def opp_check(tradename,ticker1,ticker2,ticker3):
    pm_check(ticker1,'asks')
    ks_check(ticker2,'yes','buy')
    #comparison math code chunk omitted

for value in value_ops.values()
    opp_check(*value)

So the code that's working iterates against the dictionary to grab the values, feeds them into the api call function, and does some calculation, and then goes on to the next value to repeat.

I think ending point would be to send all the api requests at same time, store them in a table, and then do some calcs on that entire table. IE in a batch of 20 etc.

What I was trying as a starting point was:

value_ops = {'name1':['tradename','ticker1','ticker2','ticker3'],
'name2':['tradename','ticker1','ticker2','ticker3'],
...
}

# This are the two api GET reqeusts
async def pm_check(od_pm,pSide):
    #[code cunk here] output is json that I'm grabbing two values from


async def ks_check(od_ks,kSide,direction):
    #[code cunk here] output is json that I'm grabbing two values from

async def opp_check(tradename,ticker1,ticker2,ticker3):
    pm_check(ticker1,'asks')
    ks_check(ticker2,'yes','buy')
    #comparison math code chunk omitted

import asyncio
async def process_all():
    tasks = []
    async for value in value_ops.values():
        task = asyncio.create_task(opp_check(*value))
        tasks.append(task)
    await asyncio.gather(*tasks)
    
asyncio.run(process_all())

I was hoping that this would let me proof of concept async iteration on my loop. It's throwing a runtimeerror [asyncio.run() cannot be called from a running event loop]. I suspect that while I can get past this error, the result won't actually be what I'm looking for.

Any feedback on how to speed this up is appreciated. I had also tried multiprocessing but that did nothing to speed me up (which I think makes sense this isn't a cpu issues it's the downtime while waiting for the GET reply).

EDIT: I'm using anaconda environment if relevant.


Solution

  • a single asyncio.run call at the bottom of the script is how it should be used, if you are running a standalone script. Jupyter and other runtime environments may already have an asyncio loop ongoing - and actually if you are not trying to call asyncio,run from within your functions that is what is happening there.

    Then you can create your gather call as an asyncio task - so that it can be called from a synchronous function, and them just wait for that to complete (if you are on an interactive enviroment,you can manually check for its .done() method.

    Otherwise, yes - you organized your tasks in a way to take advantage of asyncio, as long as the API calls themselves are not blocking (' requests' is blocking - but you can switch to aiohttp or httpx: they are almost a drop-in replacement for requests, but the actuall .get or .post calls can be awaited: so while the requests are sent, and results come back, the event loop will process other requests concurrently.

    %pip install httpx
    
    import asyncio
    import httpx
    
    value_ops = {'name1':['tradename','ticker1','ticker2','ticker3'],
    'name2':['tradename','ticker1','ticker2','ticker3'],
    ...
    }
    
    # This are the two api GET requests
    async def pm_check(od_pm,pSide):
        #[code chunk here] output is json that I'm grabbing two values from
        async with httpx.AsyncClient() as client:
            # if answer 2 doesn't depend on answer1
            # these 2 can actually be made concurrent as well
            answer1 = await client.get(...)
            answer2 = await client.get(...)
            
    
    
    async def ks_check(od_ks,kSide,direction):
        ...
        #[code chunk here] output is json that I'm grabbing two values from
    
    async def opp_check(tradename,ticker1,ticker2,ticker3):
        pm_check(ticker1,'asks')
        ks_check(ticker2,'yes','buy')
        #comparison math code chunk omitted
    
    import asyncio
    async def process_all():
        tasks = []
        async for value in value_ops.values():
            task = asyncio.create_task(opp_check(*value))
            tasks.append(task)
        # The gather pattern will ensure you concurrency.
        # the only downside is it will create __all__ your
        # requests in one goe - if there is
        # some API limit, or throtling you might 
        # want to use a more detailed pattern to
        # ensue you emmit just a reasonable number
        # of requests at each time;
        await asyncio.gather(*tasks)
        
        
    # and here, if you already running in an async loop, this will fail:
    # asyncio.run(process_all())
    
    # But you can create the `process_all` call as a task and check for
    # its completion:
    
    main_task = asyncio.create_task(process_all())
    
    # and then, in other cell, check the return of `main_task.done()`