I have a function that I parallelise using concurrent.futures.ProcessPoolExecutor()
something like this:
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(my_func, ids, it.repeat(num_ids))
Where ids
is a list of tuples that are formed of two elements. The first element contains an integer that ascends by 1 for each subsequent tuple. I use this to create an 'iteration progress tracker'. The second element contains an input my_func
uses.
my_func
is too lengthy to add here and I haven't been able to get an MRE that has reciprocal behaviour. However, it looks something like this:
def my_func(id, num_ids):
print(f"{id[0]} of {num_ids}")
# Extract something from a database, transform it and then add the new data back into the database
On one run I noticed that at around 5k on the iteration tracker it suddenly jumped to 10k, stuttered and then kept going. After that it skipped some more records every so often. This pattern repeats if I run the code again but the skips happen in slightly different places each time.
I duly kicked into debug mode in VS Code but to my surprise no records were skipped when running the code from the debugger. No errors, nothing.
The only way I've found to stop the skipping outside of the debugger is to set the max_workers
argument to half of my threads.
I realise without an MRE this is difficult to diagnose but I'm hoping someone else may have experienced this issue or recognises these symptoms?
Not easy to answer for the specific case given the information provided.
My best guess is that an exception occurs in the body of my_func
.
Indeed, if an exception occurs, nothing is going to be printed and the execution is not going to terminate.
To validate my assumption, I would define a decorator to print the exception occurring in the function:
import functools
def log_function(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
print(args, kwargs, repr(exc))
return wrapper
and apply the decorator to the function.
@log_function
def my_func(my_id, num_ids):
...