Search code examples
pythonaiohttppython-asyncio

Canceling coroutines/tasks in proxychecker without waiting for completion


I have created a proxychecker that operates fine when it is left alone to check all the proxies. But I would like to implement functionality so that upon keyboard interrupt it cancels all pending proxychecking coroutines and exits gracefully. In its current state, upon keyboard interrupt the program does not exit gracefully and I get error messages - "Task was destroyed but it is pending!"

After doing some research, I realize that this is happening because I am closing the event loop before the coroutines have finished canceling. I have decided to try and attempt my own implementation of the solution found in this stackoverflow post: What's the correct way to clean up after an interrupted event loop?

However my implementation does not work; it seems that the execution gets stuck in the loop.run_forever() because upon keyboard interrupt, my terminal is stuck in processing.

If possible, I would truly appreciate a solution that does not involve waiting for pending tasks to be finished. The target functionality is that upon keyboard interrupt, the program drops everything, issues a report, and exits.

Also I am new to asyncio so any constructive criticism of how I've structured my program are also truly appreciated.

async def check_proxy(self, id, session):
    proxy = self.plist[0]
    del self.plist[0]
    try:
        self.stats["tries"] += 1
        async with session.head(self.test_url, proxy=proxy, timeout=self.timeout) as response:
            if response and response.status == 200:
                self.stats["alive"] += 1
                self.alive.append(proxy)
                print(f"{id} has found a live proxy : " + proxy)
    except Exception:
        pass

async def main(self):
    tasks = []
    connector = ProxyConnector()
    async with aiohttp.ClientSession(connector=connector, request_class=ProxyClientRequest) as session:
        while len(self.plist) > 0:
            if len(self.plist) >= self.threads:
                for i in range(self.threads):
                    tasks.append(asyncio.ensure_future(self.check_proxy(i+1, session)))
            else:
                for i in range(len(self.plist)):
                    tasks.append(asyncio.ensure_future(self.check_proxy(i+1, session)))
            await asyncio.gather(*tasks)

def start(self):
    self.load_proxies()
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(self.main())
    except KeyboardInterrupt:
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.run_forever()
        asyncio.Task.all_tasks().exception()
    finally:
        loop.close()
        self.report_stats()

Ideally, the output should look something like this:

...

55 has found a live proxy : socks5://81.10.222.118:1080

83 has found a live proxy : socks5://173.245.239.223:16938

111 has found a live proxy : socks5://138.68.41.90:1080

^C

# of Tries: 160

# of Alive: 32


Solution

  • Took me a while because I was branching out on the wrong track but here was the fix in case others run into the same problem.

    except KeyboardInterrupt:
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.stop() #-- rather unintuitive in my opinion, works though.
        loop.run_forever()
    finally:
        loop.close()
    

    Very simple fix; rather unfortunate that I didn't realize sooner. Hopefully this can prevent someone else from a similar descent into madness. Cheers!