I am using Python's concurrent.futures
library with ThreadPoolExecutor
and ProcessPoolExecutor
. I want to implement a mechanism to cancel all running or unexecuted tasks if any one of the tasks fails. Specifically, I want to:
Here is the approach I have tried:
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
copy_func = partial(copy_from, table_name=table_name, column_string=column_string)
with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}
for f in as_completed(futures):
try:
f.result()
except Exception as e:
executor.shutdown(wait=False) # Attempt to stop the executor
for future in futures:
future.cancel() # Cancel all futures
raise e # Raise the exception
ThreadPoolExecutor
and ProcessPoolExecutor
?concurrent.futures
after an exception?Thank you!
Your approach is almost correct, but there are some important things to note:
executor.shutdown(wait=False)
only prevents new tasks from being submitted. It doesn't actually cancel the running tasks. The tasks that have already started executing will continue until completion.future.cancel()
only cancels tasks that are still in the queue (not started). If a task has already started, future.cancel()
won't stop it.To cancel the running tasks effectively, you need to handle it in a way that your tasks can be interrupted by checking a flag or catching an exception.
A better approach would be to use a combination of:
try/except
block with the worker function(copy_from
) itself to detect failure and propagate it.Here's the code:
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
import threading
# Cancellation event that worker tasks can check
cancel_event = threading.Event()
def copy_from(file_path, table_name, column_string):
if cancel_event.is_set():
print(f"Task {file_path} cancelled.")
return
# Simulate some work or copying logic
try:
# Your actual copy operation here
if file_path == "some_bad_file":
raise ValueError("Simulated task failure")
print(f"Processing {file_path}")
except Exception as e:
cancel_event.set() # Set the cancellation flag
raise e
# Function to handle task execution
def run_tasks(file_path_list, table_name, column_string, cores_to_use):
copy_func = partial(copy_from, table_name=table_name, column_string=column_string)
with ProcessPoolExecutor(max_workers=cores_to_use) as executor:
futures = {executor.submit(copy_func, file_path): file_path for file_path in file_path_list}
try:
for f in as_completed(futures):
if cancel_event.is_set():
# If cancellation has been triggered, skip waiting for further results
break
# Attempt to get the result and raise any exception if task failed
f.result()
except Exception as e:
cancel_event.set() # Cancel all tasks
for future in futures:
future.cancel() # Attempt to cancel futures that haven't started yet
raise e # Re-raise the exception
file_path_list = ["file1", "file2", "some_bad_file", "file4"]
table_name = "my_table"
column_string = "col1, col2"
try:
run_tasks(file_path_list, table_name, column_string, cores_to_use=4)
except Exception as ex:
print(f"An error occurred: {ex}")
To ensure that the exception is not silently ignored, it is critical to:
f.result()
in the for f in as_completed(futures)
loop. This will raise any exceptions that occurred during task execution.raise a
after canceling other tasks, as you have done.concurrent.futures
after an exception?By default, when the with ProcessPoolExecutor
block exists, it calls executor.shutdown()
, which releases resources. This is handled automatically by Python's context manager. However, adding executor.shutdown(wait=False)
is unnecessary, as you don't need to prematurely shut down the executor if you've handled cancellation properly.
To ensure resources are properly cleaned:
future.cancel()
.I hope this will help you a little.