Search code examples
python-3.xapiasynchronouspython-asyncioab-testing

Using asyncio for doing a/b testing in Python


Let's say there's some API that's running in production already and you created another API which you kinda want to A/B test using the incoming requests that's hitting the production-api. Now I was wondering, is it possible to do something like this, (I am aware of people doing traffic splits by keeping two different API versions for A/B testing etc)

As soon as you get the incoming request for your production-api, you make an async request to your new API and then carry on with the rest of the code for the production-api and then, just before returning the final response to the caller back, you check whether you have the results computed for that async task that you had created before. If it's available, then you return that instead of the current API.

I am wondering, what's the best way to do something like this? Do we try to write a decorator for this or something else? i am a bit worried about lot of edge cases that can happen if we use async here. Anyone has any pointers on making the code or the whole approach better?

Thanks for your time!


Some pseudo-code for the approach above,

import asyncio

def call_old_api():
    pass

async def call_new_api():
    pass

async def main():
    task = asyncio.Task(call_new_api())

    oldResp = call_old_api()
    resp = await task

    if task.done():
        return resp
    else:
        task.cancel() # maybe
        return oldResp

asyncio.run(main())

Solution

  • You can't just execute call_old_api() inside asyncio's coroutine. There's detailed explanation why here. Please, ensure you understand it, because depending on how your server works you may not be able to do what you want (to run async API on a sync server preserving the point of writing an async code, for example).

    In case you understand what you're doing, and you have an async server, you can call the old sync API in thread and use a task to run the new API:

    task = asyncio.Task(call_new_api())
    oldResp = await in_thread(call_old_api())
    
    if task.done():
        return task.result()  # here you should keep in mind that task.result() may raise exception if the new api request failed, but that's probably ok for you
    else:
        task.cancel() # yes, but you should take care of the cancelling, see - https://stackoverflow.com/a/43810272/1113207
        return oldResp
    

    I think you can go even further and instead of always waiting for the old API to be completed, you can run both APIs concurrently and return the first that's done (in case new api works faster than the old one). With all checks and suggestions above, it should look something like this:

    import asyncio
    import random
    import time
    from contextlib import suppress
    
    
    def call_old_api():
        time.sleep(random.randint(0, 2))
        return "OLD"
    
    
    async def call_new_api():
        await asyncio.sleep(random.randint(0, 2))
        return "NEW"
    
    
    async def in_thread(func):
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func)
    
    
    async def ensure_cancelled(task):
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task
    
    
    async def main():
        old_api_task = asyncio.Task(in_thread(call_old_api))
        new_api_task = asyncio.Task(call_new_api())
    
        done, pending = await asyncio.wait(
            [old_api_task, new_api_task], return_when=asyncio.FIRST_COMPLETED
        )
    
        if pending:
            for task in pending:
                await ensure_cancelled(task)
    
        finished_task = done.pop()
        res = finished_task.result()
        print(res)
    
    
    asyncio.run(main())