Search code examples
pythonscikit-learnparallel-processingnumpy-ndarrayjoblib

returning scikit-learn object while using Joblib


I have a numpy array, and I am using sklearn to transform the array along the first axis. I also want to save the transformer object in a dict to use later in the code. Here is my code:

scalers_dict = {}
for i in range(train_data_numpy.shape[1]):
    for j in range(train_data_numpy.shape[2]):
        scaler = QuantileTransformer(n_quantiles=60000, output_distribution='uniform')
        train_data_numpy[:,i,j] = scaler.fit_transform(train_data_numpy[:,i,j].reshape(-1,1)).reshape(-1)
        scalers_dict[(i,j)] = scaler
        

My train_data_numpy is of shape (60000, 28,28). The problem is that this takes a very long time to process (train_data_numpy is MNIST dataset). I have an AMD Ryzen 5950X with 16 cores and I would like to parallelize this piece of code.

I know for example I could write something like this:

Parallel(n_jobs=16)(delayed(QuantileTransformer(n_quantiles=60000, output_distribution='uniform').fit_transform)(train_data_numpy[:,i,j].reshape(-1,1)) for j in range(train_data_numpy.shape[2]))

But this doesn't return the scaler object, and I don't know how to utilize Joblib for this task.


Solution

  • You can use Dask-ML which is implemented on the top of Dask Library, yet it is compatible with scikit-learn.

    Installation:

    conda install -c conda-forge dask-ml
    
    or
    
    pip install dask-ml
    

    Example

    import time
    from sklearn.datasets import make_classification
    from sklearn.preprocessing import QuantileTransformer as skQT
    from dask_ml.preprocessing import QuantileTransformer as daskQT
    
    # toy big dataset for testing
    X, y = make_classification(n_samples=1000000, n_features=100, random_state=2021)
    
    # Comparison
    
    scaler = skQT()
    start_ = time.time()
    scaler.fit_transform(X)
    end_ = time.time() - start_
    print("No Parallelism -- Time Elapsed: {}".format(end_))
    
    
    # Using Dask ML
    scaler = daskQT()
    start_ = time.time()
    scaler.fit_transform(X)
    end_ = time.time() - start_
    print("With Parallelism -- Time Elapsed: {}".format(end_))
    

    Result

    No Parallelism -- Time Elapsed: 18.680
    With Parallelism -- Time Elapsed: 2.982
    

    My Device Specs:

    Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
    
    Number of Cores: 12