Search code examples
pythonconcurrent.futures

ThreadPoolExecutor exits before queue is empty


My goal is to concurrently crawl URLs from a queue. Based on the crawling result, the queue may be extended. Here is the MWE:

import queue
from concurrent.futures import ThreadPoolExecutor
import time

def get(url): # let's assume that the HTTP magic happens here
    time.sleep(1)
    return f'data from {url}'

def crawl(url, url_queue: queue.Queue, result_queue: queue.Queue):
    data = get(url)
    result_queue.put(data)
    if 'more' in url:
        url_queue.put('url_extended')

url_queue = queue.Queue()
result_queue = queue.Queue()

for url in ('some_url', 'another_url', 'url_with_more', 'another_url_with_more', 'last_url'): 
    url_queue.put(url)


with ThreadPoolExecutor(max_workers=8) as executor:
    while not url_queue.empty():
        url = url_queue.get()
        executor.submit(crawl, url, url_queue, result_queue)

while not result_queue.empty():
    data = result_queue.get()
    print(data)

In this MWE, two URLs require another crawl: 'url_with_more' and 'another_url_with_more'. They are added to the url_queue while crawling.

However, this solution ends before those two 'more' URLs are processed; after running, the url_queue remains to have two entries.

How can I make sure that the ThreadPoolExecutor does not exit too early? Have I misunderstood ThreadPoolExecutor?


Solution

  • You have a race condition where the check for more work to submit happens before new tasks are added from the other thread, you need to not exit the threadpool until you wait on all submitted jobs, and then check if there is more work to submit.

    task_queue = queue.Queue()
    with ThreadPoolExecutor(max_workers=8) as executor:
        # exit when no task pending or to be scheduled
        while not (url_queue.empty() and task_queue.empty()):
            # submit new tasks
            while not url_queue.empty():
                url = url_queue.get()
                task_queue.put(executor.submit(crawl, url, url_queue, result_queue))
            # wait for one task
            if not task_queue.empty():
                task_queue.get().result()
                # process result here if needed.
         
    

    You might want to use concurrent.futures.wait with FIRST_COMPLETED instead of waiting on the first task in the queue if tasks have varying times.

    tasks_to_wait = set()
    with ThreadPoolExecutor(max_workers=8) as executor:
        # exit when no task pending or to be scheduled
        while not (url_queue.empty() and len(tasks_to_wait) == 0):
            # submit new tasks
            while not url_queue.empty():
                url = url_queue.get()
                tasks_to_wait.add(executor.submit(crawl, url, url_queue, result_queue))
            # wait for one task
            if len(tasks_to_wait) != 0:
                done, tasks_to_wait = concurrent.futures.wait(tasks_to_wait, None, concurrent.futures.FIRST_COMPLETED)