Search code examples
pythoncluster-computingpython-multiprocessingbatch-processinghpc

How to distribute multiprocess CPU usage over multiple nodes?


I am trying to run a job on an HPC using multiprocessing. Each process has a peak memory usage of ~44GB. The job class I can use allows 1-16 nodes to be used, each with 32 CPUs and a memory of 124GB. Therefore if I want to run the code as quickly as possible (and within the max walltime limit) I should be able to run 2 CPUs on each node up to a maximum of 32 across all 16 nodes. However, when I specify mp.Pool(32) the job quickly exceeds the memory limit, I assume because more than two CPUs were used on a node.

My natural instinct was to specify 2 CPUs as the maximum in the pbs script I run my python script from, however this configuration is not permitted on the system. Would really appreciate any insight, having been scratching my head on this one for most of today - and have faced and worked around similar problems in the past without addressing the fundamentals at play here.

Simplified versions of both scripts below:

#!/bin/sh 

#PBS -l select=16:ncpus=32:mem=124gb
#PBS -l walltime=24:00:00

module load anaconda3/personal
source activate py_env

python directory/script.py
#!/usr/bin/env python
# -*- coding: utf-8 -*- 

import numpy as np
import pandas as pd
import multiprocessing as mp

def df_function(df, arr1, arr2):
    df['col3'] = some_algorithm(df, arr1, arr2)
    return df

def parallelize_dataframe(df, func, num_cores):
    df_split = np.array_split(df, num_cores)
    with mp.Pool(num_cores, maxtasksperchild = 10 ** 3) as pool:
        df = pd.concat(pool.map(func, df_split))
    return df

def main():
    # Loading input data    
    direc = '/home/dir1/dir2/'
    file = 'input_data.csv'
    a_file = 'array_a.npy'
    b_file = 'array_b.npy'
    df = pd.read_csv(direc + file)
    a = np.load(direc + a_file)
    b = np.load(direc + b_file)

    # Globally defining function with keyword defaults
    global f
    def f(df):
        return df_function(df, arr1 = a, arr2 = b)

    num_cores = 32 # i.e. 2 per node if evenly distributed.
    # Running the function as a multiprocess:
    df = parallelize_dataframe(df, f, num_cores)
    # Saving:
    df.to_csv(direc + 'outfile.csv', index = False)

if __name__ == '__main__':
    main()

Solution

  • To run your job as-is, you could simply request ncpu=32 and then in your python script set num_cores = 2. Obviously this has you paying for 32 cores and then leaving 30 of them idle, which is wasteful.

    The real problem here is that your current algorithm is memory-bound, not CPU-bound. You should be going to great lengths to read only chunks of your files into memory, operating on the chunks, and then writing the result chunks to disk to be organized later.

    Fortunately Dask is built to do exactly this kind of thing. As a first step, you can take out the parallelize_dataframe function and directly load and map your some_algorithm with a dask.dataframe and dask.array:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*- 
    
    import dask.dataframe as dd
    import dask.array as da
    
    
    def main():
        # Loading input data    
        direc = '/home/dir1/dir2/'
        file = 'input_data.csv'
        a_file = 'array_a.npy'
        b_file = 'array_b.npy'
        df = dd.read_csv(direc + file, blocksize=25e6)
        a_and_b = da.from_np_stack(direc)
    
        df['col3'] = df.apply(some_algorithm, args=(a_and_b,))
        
        # dask is lazy, this is the only line that does any work    
        # Saving:
        df.to_csv(
            direc + 'outfile.csv',
            index = False,
            compute_kwargs={"scheduler": "threads"},  # also "processes", but try threads first
        )
    
    if __name__ == '__main__':
        main()
    

    That will require some tweaks to some_algorithm, and to_csv and from_np_stack work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue.