I was wondering to log a model on mlflow, once I do it, I'm able to predict probabilities with python loaded model but not with spark_udf. The thing is, I still need to have a preprocessing function inside the model. Here is a toy reproductible example for you to see when it fails:
import mlflow
from mlflow.models.signature import infer_signature
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import numpy as np
X, y = make_classification(n_samples=1000, n_features=10, n_informative=2, n_classes=2, shuffle=True, random_state=1995)
X, y = pd.DataFrame(X), pd.DataFrame(y,columns=["target"])
# geerate column names
X.columns = [f"col_{idx}" for idx in range(len(X.columns))]
X["categorical_column"] = np.random.choice(["a","b","c"], size=len(X) )
def encode_catcolumn(X):
X = X.copy()
# replace cat values [a,b,c] for [-10,0,35] respectively
X['categorical_column'] = np.select([X["categorical_column"] == "a", X["categorical_column"] == "b", X["categorical_column"] == "c"], [-10, 0,35] )
return X
# with catcolumn encoded; i need to use custom encoding , we'll do this within mlflow later
X_encoded = encode_catcolumn(X)
Now let's create a wrapper for the model to encode the function within the model. Please see that the function encode_catcolumn within the class and the one outside the class presented before are the same.
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def encode_catcolumn(self,X):
X = X.copy()
# replace cat values [a,b,c] for [-10,0,35] respectively
X['categorical_column'] = np.select([X["categorical_column"] == "a", X["categorical_column"] == "b", X["categorical_column"] == "c"], [-10, 0,35] )
return X
def predict(self, context, model_input):
# encode catvariable
model_input = self.encode_catcolumn(model_input)
# predict probabilities
predictions = self.model.predict_proba(model_input)[:,1]
return predictions
Now let's log the model
with mlflow.start_run(run_name="reproductible_example") as run:
clf = RandomForestClassifier()
clf.fit(X_encoded,y)
# wrappmodel with pyfunc, does the encoding inside the class
wrappedModel = SklearnModelWrapper(clf)
# When the model is deployed, this signature will be used to validate inputs.
mlflow.pyfunc.log_model("reproductible_example_model", python_model=wrappedModel)
model_uuid = run.info.run_uuid
model_path = f'runs:/{model_uuid}/reproductible_example_model'
Do the inference without spark and works perfectly:
model_uuid = run.info.run_uuid
model_path = f'runs:/{model_uuid}/reproductible_example_model'
# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(model_path)
# predictions without spark , encodes the variables INSIDE; this WORKS
loaded_model.predict(X)
Now do the inference with spark_udf and get an error:
# create spark dataframe to test it on spark
X_spark = spark.createDataFrame(X)
# Load model as a Spark UDF.
loaded_model_spark = mlflow.pyfunc.spark_udf(spark, model_uri=model_path)
# Predict on a Spark DataFrame.
columns = list(X_spark.columns)
# this does not work
X_spark.withColumn('predictions', loaded_model_spark(*columns)).collect()
The error is:
PythonException: An exception was thrown from a UDF: 'KeyError: 'categorical_column'', from <command-908038>, line 7. Full traceback below:
I need to some how encode the variables and preprocess within the class. Is there any solution to this or any workaround to make this code able to woork with spark? What I've tried so far:
I solve this by just changing the last chunk of my question, when I load the spark_udf model and perform inference. This is a possible answer to the problem. Just pass an F.struct() to the spark_udf instead of a list of columns. Like in the chunk bellow:
import pyspark.sql.functions as F
# create spark dataframe to test it on spark
X_spark = spark.createDataFrame(X)
# Load model as a Spark UDF.
loaded_model_spark = mlflow.pyfunc.spark_udf(spark, model_uri=model_path)
# Predict on a Spark DataFrame.
# columns = list(X_spark.columns) --> delete this
columns = F.struct(X_spark.columns) # use struct
# this does work
X_spark.withColumn('predictions', loaded_model_spark(columns)).collect()