I've been wrapping my brain trying to figure out why I cannot execute this parallizable function on Dask. Essentially I have a function that loads in a keras model (I'm storing the model using mlflow) and then uses the model's predict method on some input data that I am sending in batches. This piece of code (below) is resulting in the following error:
AttributeError: '_thread._local' object has no attribute 'value'
Code Example:
@delayed
def load_and_predict(input_data_chunk):
def contrastive_loss(y_true, y_pred):
margin = 1
square_pred = K.square(y_pred)
margin_square = K.square(K.maximum(margin - y_pred, 0))
return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
mlflow.set_tracking_uri('<tracking_uri>')
mlflow.set_experiment('experiment_name')
runs = mlflow.search_runs()
artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
y_pred = model.predict(input_data_chunk)
return y_pred
with Client(<scheduler_ip:port>) as client:
batch_array = np.array_split(X_test, 10)
results = []
for batch in batch_array:
prediction = load_and_predict(batch)
results.append(prediction)
compute(*results)
I am admittedly quite new to Dask, so any expert guidance on this problem would be much appreciated.
If you intend to use the Client
you create in the context to compute, then the compute()
line must be within the context too: please indent it.