Search code examples
pythonkerasdaskdask-distributeddask-delayed

Dask Delayed Error - AttributeError: '_thread._local' object has no attribute 'value'


I've been wrapping my brain trying to figure out why I cannot execute this parallizable function on Dask. Essentially I have a function that loads in a keras model (I'm storing the model using mlflow) and then uses the model's predict method on some input data that I am sending in batches. This piece of code (below) is resulting in the following error:

AttributeError: '_thread._local' object has no attribute 'value'

Code Example:

@delayed
def load_and_predict(input_data_chunk):

    def contrastive_loss(y_true, y_pred):
            margin = 1
            square_pred = K.square(y_pred)
            margin_square = K.square(K.maximum(margin - y_pred, 0))
            return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

    mlflow.set_tracking_uri('<tracking_uri>')
    mlflow.set_experiment('experiment_name')
    runs = mlflow.search_runs()
    artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
    model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
    y_pred = model.predict(input_data_chunk)
    return y_pred

with Client(<scheduler_ip:port>) as client:
    batch_array = np.array_split(X_test, 10)
    results = []
    for batch in batch_array:
        prediction = load_and_predict(batch)
        results.append(prediction)

compute(*results)

I am admittedly quite new to Dask, so any expert guidance on this problem would be much appreciated.


Solution

  • If you intend to use the Client you create in the context to compute, then the compute() line must be within the context too: please indent it.