Search code examples
pythonnumpyscikit-learnmultiprocessingbigdata

How to use multiprocessing Pool when evaluating many images using scikit-learn pipeline?


I used a GridSearchCV pipeline for training several different image classifiers in scikit-learn. In the pipeline I used two stages, scaler and classifier. The training run successfully, and this is what turned out to be the best hyper-parameter setting:

Pipeline(steps=[('scaler', MinMaxScaler()),
                ('classifier',
                 ExtraTreesClassifier(criterion='log_loss', max_depth=30,
                                      min_samples_leaf=5, min_samples_split=7,
                                      n_estimators=50, random_state=42))],
         verbose=True)

Now I want to use this trained pipeline to test it on a lot of images. Therefore, I'm reading my test images from disk (150x150px) and store them in a hdf5 file, where each image is represented as a row vector (150*150=22500px), and all images are stacked upon each other in an np.array:

X_test.shape -> (n_imgs,22500)

Then I'm predicting the labels y_preds with

y_preds = model.predict(X_test)

So far, so good, as long as I'm only predicting some images.

But when n_imgs is growing (e.g. 1 Mio images), it doesn't fit into memory anymore. So I was googling around and found some solutions, that unfortunately didn't work.

I'm currently trying to use multiprocessing.pool.Pool. Now my problem: I want to call multiprocessing's Pool.map(), like so:

n_cores = 10
with Pool(n_cores) as pool:
    results = pool.map(model.predict, X_test, chunksize=22500)

but suddenly all workers say:

All workers say 'Stopping'

without further details, no matter what chunksize I use.

So I tried to reshape X_test so that each image is represented blockwise next to each other:

X_reshaped = np.reshape(X_test,(n_imgs,150,150))

now chunksize picks out whole images, but as my model has been trained on 1x22500 arrays, not quadratic ones, I get the error:

ValueError: X_test has 150 features, but MinMaxScaler is expecting 22500 features as input.

I'd need to reshape the images back to 1x22500 before predict runs on the chunks. But I'd need a function with several inputs, which pool.map() doesn't allow (it only takes 1 argument for the given function).

So I followed Jason Brownlee's post: Multiprocessing Pool map() Multiple Arguments

and packed several variables into a tuple, which I then unpacked in a wrapper function, before calling model.predict():

n_imgs = X_test.shape[0]

X_reshaped = np.reshape(X_test,(n_imgs,150,150)) # reshape each row to 150x150px images

input_tuple = (model,X_reshaped) # pack model and data into a tuple as input for the wrapper

with Pool(n_cores) as pool:
    results = pool.map(predict_wrapper, input_tuple, chunksize=22500)

and the wrapper function:

def predict_wrapper(input_tuple):

    model, X = input_tuple # unpack the input tuple

    n_imgs = X.shape[0]
    X_mod = np.reshape(X,(n_imgs,150*150)) # reshape back

    y_preds = model.predict(X_mod)

    return y_preds

But: input_tuple doesn't get unpacked correctly in the wrapper function:

Wrong unpacking result

As you can see: instead of assigning model to model and X_test to X, it splits my pipeline and assigns the scaler to model and the classifier to X. 🤯

So, long story short:

does anybody have a solution how I can use my trained scikit-learn pipeline and do prediction on a plethora of images? I'm not bound to use multiprocessing.pool.Pool, but I didn't find any other solution so far...

Many thanks in advance! 🤝🏼


Solution

  • Ok, now I finally got this working! Thanks to Frank Yellin's answer here I realized that my problem seemed to be the chunksize I explicitly passed. I thought that by doing so I could force pool.map() to take a certain number of images per chunk, but it behaved differently and complained about the wrong dimensions of the given chunks.

    But inspired by Frank's answer I rather defined the chunks before the call to pool.map() and then passed the chunks to it. Now the images are passed chunkwise to the single workers.

    Seems I could not see the forest for the trees...

    So in the end it looks like this:

    from multiprocessing import Pool
    
    import h5py
    import joblib
    import numpy as np
    
    
    def main_prediction_batch():
    
        # --- load model ---
        model_URL = "<path to model.pkl>"
        with open(model_URL, 'rb') as model_file:
            model = joblib.load(model_file)
    
        # --- load image and label file ---
        hdf5_file_URL = "<path to hdf5 file with images and labels.hdf5>"
        with h5py.File(hdf5_file_URL, mode='r') as hdf5_file:
            X_test = hdf5_file["Images"][:] # 👉️ (n_imgs, 150*150)
            y_test = hdf5_file["Labels"].asstr()[:] # 👉️ (n_imgs,)
    
        n_imgs = X_test.shape[0]
    
        n_cores = 10
    
        image_batching = 10000  # or whatever you want your batch size to be
                                # doesn't have to be a multiple of n_cores!
        chunk_ranges = [(i, min(i + image_batching, n_imgs)) 
                        for i in range(0, n_imgs, image_batching)]
    
        # define chunks of several images
        chunks = [
            (X_test[chunk_ranges[i][0]:chunk_ranges[i][1], :])
            for i in range(len(chunk_ranges))
            ]
    
        with Pool(n_cores) as pool:
            results = pool.map(model.predict, chunks)
    
        # stack the predictions to get a final row vector
        y_preds = np.hstack(results) # can now be compared with y_test
    
        return y_preds
    
    # --------------------------
    #           MAIN
    # --------------------------
    if __name__ == '__main__':
        
        y_preds = main_prediction_batch()
    

    When I now look at it... it was complicated to describe but the final solution was quite simple... thanks a lot for enlightening me!