Search code examples
pythonpython-2.7python-multithreadingconcurrent.futures

How to control throughput speed of python's ThreadPoolExecutor?


I am starting asynchronous tasks using python's concurrent.futures ThreadPoolExecutor. Following this approach, I monitor the progress of the async calls using the tqdm progress bar.

My code looks like this:

with concurrent.futures.ThreadPoolExecutor(max_workers = n_jobs) as executor:
    future_to_url = {executor.submit(target_function, URL): URL for URL in URL_list}
    kwargs = {'total': len(future_to_url), # For tqdm
            'unit': 'URL',                 # For tqdm
            'unit_scale': True,            # For tqdm
            'leave': False,                # For tqdm
            'miniters': 50,                # For tqdm
            'desc': 'Scraping Progress'}
    for future in tqdm(concurrent.futures.as_completed(future_to_url), **kwargs):
            URL = future_to_url[future]
            try:
                data = future.result()     # Concurrent calls
            except Exception as exc:
                error_handling()           # Handle errors
            else:
                result_handling()          # Handle non-errors

The console output looks like this:

Scraping Progress:   9%|▉  | 3.35k/36.2k [08:18<1:21:22, 6.72URL/s] # I want < 6/s
Scraping Progress:   9%|▉  | 3.40k/36.2k [08:26<1:21:16, 6.72URL/s] # I want < 6/s
Scraping Progress:  10%|▉  | 3.45k/36.2k [08:30<1:20:40, 6.76URL/s] # I want < 6/s
Scraping Progress:  10%|▉  | 3.50k/36.2k [08:40<1:20:51, 6.73URL/s] # I want < 6/s
Scraping Progress:  10%|▉  | 3.55k/36.2k [08:46<1:20:36, 6.74URL/s] # I want < 6/s
Scraping Progress:  10%|▉  | 3.60k/36.2k [08:52<1:20:17, 6.76URL/s] # I want < 6/s

I know I can set up a URL queue and control its size, as described here.

However, I don't know how to control throughput speed itself. Lets say I want not more than 6 URLs/sec. Can this be archived by something else than throwing in time.sleep(n) to target_function() in the above example?

How to effectively control throughput speed of ThreadPoolExecutor in python's concurrent.futures?


Solution

  • To answer shortly, there is no such way. Once you have declared your pool, you cannot change the number of workers without first closing the pool and recreating it. There is also no way to make the pool feed tasks slower than the maximum speed to workers.

    You have a couple of (not so optimal) choices.

    One is to add sleep based on global variable to the worker. Then you can use task completed callbacks to measure actual speed and adjust the variable accordingly. But if sleeping is out of question, this does not work.

    The better, albeit more onerous, way is to write the task manager yourself. In this version you do not use pool but write a class that manages worker processes. You spawn "enough" workers, and the workers listen to a queue for tasks. You will feed this queue from your manager in your desired speed. You will set your queue to have a very low maximum size, and if your manager detects the queue is full, it spawns another worker.

    But there is no built-in functionality to do what you want, which means some work is needed or you need to redesign your program so that you will not feed all of your tasks to the pool in one go but do some throttling there.