I have an async service that processes data. My current approach is to process different folds created using TimeSeriesSplit
in parallel, since this is a CPU heavy task I decided to uses concurrent.futures.process.ProcessPoolExecutor
where each worker loads it's own fold (using the list of corresponding indices that get passed as argument to this blocking function process_single_fold
) and then performs CPU heavy operations on that data. Whenever a fold is processed I store it in the database, using asyncio.as_completed(futures)
.
However I noticed that when I increase the max_workers
in ProcessPoolExecutor
(e.g to 5) my code hangs sometimes, I guess depending on the workload on my machine (and in that case even though I see log messages that the processing finished, I still see that some workers are still 'running').
with ProcessPoolExecutor(max_workers=min(config.n_splits, os.cpu_count())) as pool:
loop = asyncio.get_running_loop()
table_created_flag = asyncio.Event()
futures = [
loop.run_in_executor(
pool,
process_single_fold,
ctx._settings.SRCDB_URL,
fold,
train_index,
test_index,
config,
)
for fold, (train_index, test_index) in enumerate(splits)
]
table_cache = {}
for fold, future in enumerate(asyncio.as_completed(futures)):
await store_dataframes_incrementally(
ctx.dst_db.engine,
future,
config,
fold,
table_created_flag,
table_cache,
)
process_single_fold
performs I/O connecting to the database (creates it's own SQLAlchemy engine using NullPool) and loads the data for a specific fold. The table flag I use in the code is an indicator whether the table has been created or not (which happens when the first fold is processed and ready)
My question is, first, is there a flaw in the design of this solution and why? and how do I handle this problem of a process getting randomly terminated (possibly by the OS) or hanging (can I kill the process and retry for that fold for example) and second if this is an expected outcome and the main thing I need to do is set max_workers
wisely?
I am using WSL.
I had a silly bug in that the ProcessPoolExecutor was getting recreated for each request and I believe that was causing the hanging problem as a new request could disrupt one that is already running. Make sure the pool is created only once and reused by multiple requests.