I am trying to execute cross validation folds in parallel with the joblib library in python.
I have the following sample code:
from sklearn.model_selection import KFold
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import LinearSVC
iris = datasets.load_iris()
X, Y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5)
#clf = svm.LinearSVC()
clf = svm.SVC(kernel='rbf')
#clf = svm.SVC(kernel='linear')
f1_list = []
for train_index, test_index in skf.split(X, Y):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = Y[train_index], Y[test_index]
clf.fit(X_train, y_train)
Y_predict = clf.predict(X_test)
f1 = f1_score(y_test, Y_predict, average='weighted')
print(f1)
conf_mat = confusion_matrix(y_test, Y_predict)
print(conf_mat)
f1_list.append(f1)
print(f1_list)
I would like to execute the for loop in parallel to obtain accuracy scores in parallel for each of the folds.
I believe the joblib library has to be used in the following way:
from math import sqrt
from joblib import Parallel, delayed
def producer():
for i in range(6):
print('Produced %s' % i)
yield i
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
delayed(sqrt)(i) for i in producer())
any suggestions on how to accomplish the parallel task integration?
In the Parallel
constructor you use the delayed
argument to designate the function you want to run in parallel. delayed
returns a new function that wraps your function. You can then call the newly wrapped function with arguments that'll be passed to your original function.
In your example, the sqrt
function is being wrapped by delayed
and is then being sent i
from a range(6)
in parallel.
What we need to do is pass delayed
a function that can train on a chunk of data and then pass that newly wrapped function the indices for the kfold split. Here's an example of doing that:
from sklearn.model_selection import KFold
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import LinearSVC
from joblib import Parallel, delayed
iris = datasets.load_iris()
X, Y = iris.data, iris.target
skf = StratifiedKFold(n_splits=5)
clf = svm.SVC(kernel='rbf')
def train(train_index, test_index):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = Y[train_index], Y[test_index]
clf.fit(X_train, y_train)
Y_predict = clf.predict(X_test)
f1 = f1_score(y_test, Y_predict, average='weighted')
conf_mat = confusion_matrix(y_test, Y_predict)
return dict(f1=f1, conf_mat=conf_mat)
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
delayed(train)(train_index, test_index) for train_index, test_index in skf.split(X, Y))
f1_scores = [d['f1'] for d in out]
conf_mats = [d['conf_mat'] for d in out]
print('f1_scores:', f1_scores)
print('confusion matrices:', conf_mats)
Out:
f1_scores: [0.9665831244778613, 1.0, 0.9665831244778613, 0.9665831244778613, 1.0]
confusion matrices: [array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 1, 9]], dtype=int64), array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 9, 1],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 9, 1],
[ 0, 0, 10]], dtype=int64), array([[10, 0, 0],
[ 0, 10, 0],
[ 0, 0, 10]], dtype=int64)]
out
contains the returned metrics from the train
function, so we can just split out the f1 scores and confusion matrices separately if we need to.