Search code examples
python-3.xpython-asyncioaiohttp

How to handle exception when using asyncio.wait FIRST_COMPLETED?


Is there any chance to handle exception when using asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)?

I am trying to get my ip from several external resourses. But I am not able to handle the exception when one of the provided urls is broken. loop execution should stop when any of the requests returns the ip.

import asyncio
import aiohttp
from concurrent.futures import FIRST_COMPLETED


SERVICES = [
    ('ip-api1', 'http://ip-api.com/json', 'query'),
    ('broken_api', 'http://broken', 'query'),
]


async def get_ip(session, service, url, ip_attr):
    print('fetching ip from: {}'.format(service))
    async with session.get(url) as resp:
        json_resp = await resp.json()
        ip = json_resp[ip_attr]
        return service, ip


async def get_ip_from_services():
    async with aiohttp.ClientSession() as session:
        coro_obj = [get_ip(session, service[0], service[1], service[2])
                    for service in SERVICES]
        try:
            done, pending = await asyncio.wait(coro_obj, return_when=FIRST_COMPLETED)
            result = done.pop().result()
            print(result)
            for future in pending:
                future.cancel()
        except:
            print('catched')


loop = asyncio.get_event_loop()
loop.run_until_complete(get_ip_from_services())
loop.close()

I am able to catch exception when return_when=ALL_COMPLETED but not FIRST_COMPLETED

I got the following errors:

fetching ip from: ip-api1
fetching ip from: broken_api
catched
exception calling callback for <Future at 0x103839240 state=finished returned list>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 297, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 419, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x103807948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038073d8>()]>>

or warnings

fetching ip from: ip-api1
fetching ip from: broken_api
catched
Task was destroyed but it is pending!
task: <Task pending coro=<ThreadedResolver.resolve() running at /Volumes/external/venv/lib/python3.6/site-packages/aiohttp/resolver.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py:408, <TaskWakeupMethWrapper object at 0x1038e2948>()]> cb=[shield.<locals>._done_callback() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:679]>
Task was destroyed but it is pending!
task: <Task pending coro=<get_ip() done, defined at ./run_7.py:15> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1038e23d8>()]>>

Solution

  • I think it's a bug in aiohttp, I posted it there.

    Exception you recieve doesn't seem to be critical, so you can leave it as is until it fixed in aiohttp. Or don't cancel requests if you don't want to see it now.

    Few unrelated notes to your code:

    1) Don't use concurrent.futures with asyncio: these are different modules. Use asyncio.FIRST_COMPLETED.

    2) You should not only cancel task but also wait it cancelled (it'll raise CancelledError). It can be done with this code:

    for task in pending:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task
    

    Read more about tasks/cancelling here.