Handling ensure_future and its missing tasks

I have a streaming application that almost continuously takes the data given as input and sends an HTTP request using that value and does something with the returned value.

Obviously to speed things up I've used asyncio and aiohttp libraries in Python 3.7 to get the best performance, but it becomes hard to debug given how fast the data moves.

This is what my code looks like

Gets the final requests
async def apiRequest(info, url, session, reqType, post_data=''):
    if reqType:
        async with, data = post_data) as response:
            info['response'] = await response.text()
        async with session.get(url+post_data) as response:
            info['response'] =  await response.text()
    return info

Loops through the batches and sends it for request
async def main(data, listOfData):
    tasks = []
    async with ClientSession() as session:
        for reqData in listOfData:
                task = asyncio.ensure_future(apiRequest(**reqData))
            except Exception as e:
                exc_type, exc_obj, exc_tb = sys.exc_info()
                fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
                print(exc_type, fname, exc_tb.tb_lineno)
        responses = await asyncio.gather(*tasks)
    return responses #list of APIResponses

Streams data in and prepares batches to send for requests
async def Kconsumer(data, loop, batchsize=100):
        consumer = AIOKafkaConsumer(**KafkaConfigs)
        await consumer.start()
        dataPoints = []
        async for msg in consumer:
                consumedMsg = loads(msg.value.decode('utf-8'))
                if consumedMsg['tid']:
                if len(dataPoints)==batchsize or time.time() - startTime>5:
                    #1: The task below goes and sends HTTP GET requests in bulk using aiohttp
                    task = asyncio.ensure_future(getRequests(data, dataPoints))
                    res = await asyncio.gather(*[task])
                    if task.done():
                        outputs = []
                        #2: Does some ETL on the returned values
                        ids = await asyncio.gather(*[doSomething(**{'tid':x['tid'],
                                                'cid':x['cid'], 'tn':x['tn'],
                                                'id':x['id'], 'ix':x['ix'],
                                                'ac':x['ac'], 'output':to_dict(xmltodict.parse(x['response'],encoding='utf-8')),
                                                'loop':loop, 'option':1}) for x in res[0]])
                        simplySaveDataIntoDataBase(id) # This is where I see some missing data in the database
                    dataPoints = []
            except Exception as e:
                    exc_type, exc_obj, exc_tb = sys.exc_info()
                    fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
                    logger.error(str(exc_type) +' '+ str(fname) +' '+ str(exc_tb.tb_lineno))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(Kconsumer(data, loop, batchsize=100))

Does the ensure_future need to be awaited ? How does aiohttp handle requests that come a little later than the others? Shouldn't it hold the whole batch back instead of forgetting about it altoghter?


  • Does the ensure_future need to be awaited ?

    Yes, and your code is doing that already. await asyncio.gather(*tasks) awaits the provided tasks and returns their results in the same order.

    Note that await asyncio.gather(*[task]) doesn't make sense, because it is equivalent to await asyncio.gather(task), which is again equivalent to await task. In other words, when you need the result of getRequests(data, dataPoints), you can write res = await getRequests(data, dataPoints) without the ceremony of first calling ensure_future() and then calling gather().

    In fact, you almost never need to call ensure_future yourself:

    • if you need to await multiple tasks, you can pass coroutine objects directly to gather, e.g. gather(coroutine1(), coroutine2()).
    • if you need to spawn a background task, you can call asyncio.create_task(coroutine(...))

    How does aiohttp handle requests that come a little later than the others? Shouldn't it hold the whole batch back instead of forgetting about it altoghter?

    If you use gather, all requests must finish before any of them return. (That is not aiohttp policy, it's how gather works.) If you need to implement a timeout, you can use asyncio.wait_for or similar.