I am running a webscraper class
who's method name is self.get_with_random_proxy_using_chain
.
I am trying to send multithreaded calls to the same url, and would like that once there is a result from any thread, the method returns a response and closes other still active threads.
So far my code looks like this (probably naive):
from concurrent.futures import ThreadPoolExecutor, as_completed
# class initiation etc
max_workers = cpu_count() * 5
urls = [url_to_open] * 50
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url=[]
for url in urls: # i had to do a loop to include sleep not to overload the proxy server
future_to_url.append(executor.submit(self.get_with_random_proxy_using_chain,
url,
timeout,
update_proxy_score,
unwanted_keywords,
unwanted_status_codes,
random_universe_size,
file_path_to_save_streamed_content))
sleep(0.5)
for future in as_completed(future_to_url):
if future.result() is not None:
return future.result()
But it runs all the threads.
Is there a way to close all threads once the first future has completed. I am using windows and python 3.7x
So far I found this link, but I don't manage to make it work (pogram still runs for a long time).
As far as I know, running futures cannot be cancelled. Quite a lot has been written about this. And there are even some workarounds.
But I would suggest taking a closer look at the asyncio
module. It is quite well suited for such tasks.
Below is a simple example, when several concurrent requests are made, and upon receiving the first result, the rest are canceled.
import asyncio
from typing import Set
from aiohttp import ClientSession
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def wait_for_first_response(tasks):
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for p in pending:
p.cancel()
return done.pop().result()
async def request_one_of(*urls):
tasks = set()
async with ClientSession() as session:
for url in urls:
task = asyncio.create_task(fetch(url, session))
tasks.add(task)
return await wait_for_first_response(tasks)
async def main():
response = await request_one_of("https://wikipedia.org", "https://apple.com")
print(response)
asyncio.run(main())