My goal is to concurrently crawl URLs from a queue. Based on the crawling result, the queue may be extended. Here is the MWE:
import queue
from concurrent.futures import ThreadPoolExecutor
import time
def get(url): # let's assume that the HTTP magic happens here
time.sleep(1)
return f'data from {url}'
def crawl(url, url_queue: queue.Queue, result_queue: queue.Queue):
data = get(url)
result_queue.put(data)
if 'more' in url:
url_queue.put('url_extended')
url_queue = queue.Queue()
result_queue = queue.Queue()
for url in ('some_url', 'another_url', 'url_with_more', 'another_url_with_more', 'last_url'):
url_queue.put(url)
with ThreadPoolExecutor(max_workers=8) as executor:
while not url_queue.empty():
url = url_queue.get()
executor.submit(crawl, url, url_queue, result_queue)
while not result_queue.empty():
data = result_queue.get()
print(data)
In this MWE, two URLs require another crawl: 'url_with_more'
and 'another_url_with_more'
. They are added to the url_queue
while crawling.
However, this solution ends before those two 'more' URLs are processed; after running, the url_queue
remains to have two entries.
How can I make sure that the ThreadPoolExecutor does not exit too early? Have I misunderstood ThreadPoolExecutor?
You have a race condition where the check for more work to submit happens before new tasks are added from the other thread, you need to not exit the threadpool until you wait on all submitted jobs, and then check if there is more work to submit.
task_queue = queue.Queue()
with ThreadPoolExecutor(max_workers=8) as executor:
# exit when no task pending or to be scheduled
while not (url_queue.empty() and task_queue.empty()):
# submit new tasks
while not url_queue.empty():
url = url_queue.get()
task_queue.put(executor.submit(crawl, url, url_queue, result_queue))
# wait for one task
if not task_queue.empty():
task_queue.get().result()
# process result here if needed.
You might want to use concurrent.futures.wait with FIRST_COMPLETED
instead of waiting on the first task in the queue if tasks have varying times.
tasks_to_wait = set()
with ThreadPoolExecutor(max_workers=8) as executor:
# exit when no task pending or to be scheduled
while not (url_queue.empty() and len(tasks_to_wait) == 0):
# submit new tasks
while not url_queue.empty():
url = url_queue.get()
tasks_to_wait.add(executor.submit(crawl, url, url_queue, result_queue))
# wait for one task
if len(tasks_to_wait) != 0:
done, tasks_to_wait = concurrent.futures.wait(tasks_to_wait, None, concurrent.futures.FIRST_COMPLETED)