I have a machine with 24 cores and 2 threads per core. I'm trying to optimize the following code for parallel execution. However, I noticed that the code's performance starts to degrade after a certain number of threads.
import argparse
import glob
import h5py
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
import time
import datetime
from multiprocessing import Pool, cpu_count, Lock
import multiprocessing
import cProfile, pstats, io
def process_parcel_file(f, bands, mask):
start_time = time.time()
test = xr.open_dataset(f)
print(f"Elapsed in process_parcel_file for reading dataset: {time.time() - start_time}")
start_time = time.time()
subset = test[bands + ['SCL']].copy()
subset = subset.where(subset != 0, np.nan)
if mask:
subset = subset.where((subset.SCL >= 3) & (subset.SCL < 7))
subset = subset[bands]
# Adding a new dimension week_year and performing grouping
subset['week_year'] = subset.time.dt.strftime('%Y-%U')
subset = subset.groupby('week_year').mean().sortby('week_year')
subset['id'] = test['id'].copy()
# Store the dates and counting pixels for each parcel
dates = subset.week_year.values
n_pixels = test[['id', 'SCL']].groupby('id').count()['SCL'][:, 0].values.reshape(-1, 1)
# Converting to dataframe
grouped_sum = subset.groupby('id').sum()
ids = grouped_sum.id.values
grouped_sum = grouped_sum.to_array().values
grouped_sum = np.swapaxes(grouped_sum, 0, 1)
grouped_sum = grouped_sum.reshape((grouped_sum.shape[0], -1))
colnames = ["{}_{}".format(b, str(x).split('T')[0]) for b in bands for x in dates] + ['count']
values = np.hstack((grouped_sum, n_pixels))
df = pd.DataFrame(values, columns=colnames)
df.insert(0, 'id', ids)
print(f"Elapsed in process_parcel_file til end: {time.time() - start_time}")
return df
def fs_creation(input_dir, out_file, labels_to_keep=None, th=0.1, n=64, days=5, total_days=180, mask=False,
mode='s2', method='patch', bands=['B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B08', 'B8A', 'B11', 'B12']):
files = glob.glob(input_dir)
times_pool = [] # For storing execution times
times_seq = []
cpu_counts = list(range(2, multiprocessing.cpu_count() + 1, 4)) # The different CPU counts to use
for count in cpu_counts:
print(f"Executing with {count} threads")
if method == 'parcel':
start_pool = time.time()
with Pool(count) as pool:
arguments = [(f, bands, mask) for f in files]
dfs = list(tqdm(pool.starmap(process_parcel_file, arguments), total=len(arguments)))
end_pool = time.time()
start_seq = time.time()
dfs = pd.concat(dfs)
dfs = dfs.groupby('id').sum()
counts = dfs['count'].copy()
dfs = dfs.div(dfs['count'], axis=0)
dfs['count'] = counts
dfs.drop(index=-1).to_csv(out_file)
end_seq = time.time()
times_pool.append(end_pool - start_pool)
times_seq.append(end_seq - start_seq)
pd.DataFrame({'CPU_count': cpu_counts, 'Time pool': times_pool,
'Time seq' : times_seq}).to_csv('cpu_times.csv', index=False)
return 0
When executing the code, it scales well up to around 7-8 threads, but after that, the performance starts to deteriorate. I have profiled the code, and it seems that each thread takes more time to execute the same code.
For example, with 2 threads:
Elapsed in process_parcel_file for reading dataset: 0.012271404266357422
Elapsed in process_parcel_file til end: 1.6681673526763916
Elapsed in process_parcel_file for reading dataset: 0.014229536056518555
Elapsed in process_parcel_file til end: 1.5836331844329834
However, with 22 threads:
Elapsed in process_parcel_file for reading dataset: 0.17968058586120605
Elapsed in process_parcel_file til end: 12.049026727676392
Elapsed in process_parcel_file for reading dataset: 0.052398681640625
Elapsed in process_parcel_file til end: 6.014119625091553
I'm struggling to understand why the performance degrades with more threads. I've already verified that the system has the required number of cores and threads.
I would appreciate any guidance or suggestions to help me identify the cause of this issue and optimize the code for better performance.
It's really hard for me to provide a minimal working example so take that into account.
Thank you in advance.
Edit: The files are around 80MB each. I have 451 files. I added the following code to profile the function:
...
start_time = time.time()
mem_usage_start = memory_usage(-1, interval=0.1, timeout=None)[0]
cpu_usage_start = psutil.cpu_percent(interval=None)
test = xr.open_dataset(f)
times['read_dataset'] = time.time() - start_time
memory['read_dataset'] = memory_usage(-1, interval=0.1, timeout=None)[0] - mem_usage_start
cpu_usage['read_dataset'] = psutil.cpu_percent(interval=None) - cpu_usage_start
...
And more code for each line in a similar fashion.
I used the libraries memory_profiler
and psutil
, and I have the information for each thread.
CSV's with the results are available here:
https://wetransfer.com/downloads/44df14ea831da7693300a29d8e0d4e7a20230703173536/da04a0
The results identify each line in the function with the number of cpus selected, so each one is a thread.
Edit2:
Here I have a report of a subset of the data, where you can clearly see what each thread is doing, and how some threads are getting less work than others:
https://wetransfer.com/downloads/259b4e42aae6dd9cda5a22d576aba29520230717135248/ae3f88
Replace xarray.open_dataset
with xarray.load_dataset
to immediately load the data into memory; the former is quite lazy, while the latter is a bit more eager (a.k.a greedy, strict).
[if this answer is accepted]
[At the risk of repeating previous comments/answers,] This appears to be an I/O throughput limitation, where too many threads/processes attempt to simultaneously read from the same device, ironically slowing things down for all sibling threads/processes.
As for what's causing this, I believe it to be the call to xarray
's open_dataset
on line 17: test = xr.open_dataset(f)
, which by some specific interpretation of the docs may appear to be loading the data lazily:
Data is always loaded lazily from netCDF files. You can manipulate, slice and subset Dataset and DataArray objects, and no array values are loaded into memory until you try to perform some sort of actual computation.
In case this is true, it can explain the symptoms you're exhibiting - 451 files initially get represented by "empty" objects (which takes an almost insignificant amount of time), and as these objects' data is read (or manipulated) almost simultaneously - the storage device gets stormed by tens of thousands (or millions, depending on blocksize) of read requests.
There is a tip, four paragraphs down :
Xarray’s lazy loading of remote or on-disk datasets is often but not always desirable. Before performing computationally intense operations, it is often a good idea to load a Dataset (or DataArray) entirely into memory by invoking the Dataset.load() method.
Alternatively, try using load_dataset.