Search code examples
pythonspark-streamingdatabricksspark-structured-streaming

How to apply MLFlow prediction model in a stream?


I have a stream that reads ready to post feature data into an already registered model. All code is in Python. The following model and meta data function outside of stream in a regular notebook. In stream is another matter. The main issue is the data written out of stream (into destination table) has a NULL prediction. The other thing is the foreachBatch function seems unresponsive, even to deliberately planted syntax errors. No signs its a problem in the logs or the in the notebook feedback. It is as if its not being called.

I realise I am writing to the table twice (once in the function and once in the writeStream. Only one record makes it and its from the writeStream - not the function.

Code is below:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("MyTest") \
    .getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream \
    .format("delta") \
    .option('ignoreDeletes','true') \
    .table("schema.transformeddata") 

fixedValueStream = lines.select('feature1','feature2', 'feature3')


# Split the lines into words
def batchpredictions(df, epoch_id):
    
    pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
    prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1','feature2','feature3')))
    prediction.write.mode("append").saveAsTable("schema.transformeddata_prediction")
    

fixedValueStream.writeStream.format("delta").outputMode("append").foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("schema.transformeddata_prediction")

Incoming data :

feature1, feature2, feature3
1       , 5       , 9
2       , 6       , 10
3       , 7       , 11
4       , 8       , 12

Outgoing data

feature1, feature2, feature3, prediction
1       , 5       , 9       , NULL
2       , 6       , 10      , NULL
3       , 7       , 11      , NULL
4       , 8       , 12      , NULL

Any clues to what I am doing wrong?

*UPDATE: Thank you to Mike for your responses. I aim to begin optimizing my solution below using some of the things you've suggested. For now I merely needed to get something to work to fall back on. The solution in its current state is below.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
import mlflow
import mlflow.xgboost
import xgboost
import numpy as np
import pandas as pd
from pyspark.sql.types import *

# Load model as a PysparkUDF
loaded_model = mlflow.pyfunc.load_model('runs:/<mymodelrunid>/model')

spark = SparkSession \
    .builder \
    .appName("MyTest") \
    .getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream \
    .format("delta") \
    .option('ignoreDeletes','true') \
    .table("<myschema>.<mytableinput>") 
    
fixedValueStream = lines.select('feature1','feature2', 'feature3', 'feature4', 'feature5')

def foreach_batch_function(df, epoch_id):
    #text value of the multi class prediction GREEN, RED, BLUE
    df = df.withColumn("pred_class", lit('    '))
    
    #Prepare 3 holders for the 3 class scores returned from multiclass model. 
    #Done before hand so I don't have to deal with data type/additional column index/key issues.
    df = df.withColumn("prediction_class1", lit(0.00).cast("double"))
    df = df.withColumn("prediction_class2", lit(0.00).cast("double"))
    df = df.withColumn("prediction_class3", lit(0.00).cast("double"))
    
    #Select back into pandas frame
    pd_df = df.select('feature1','feature2', 'feature3', 'feature4', 'feature5','pred_class','prediction_class1','prediction_class2','prediction_class3').toPandas()

    #Pass pandas frame into model and return array of shape [<batch-df-rows-count>][3]
    y_pred = loaded_model.predict(pd_df)
    
    #Retun the max column score
    predicted_idx = np.argmax(y_pred, axis=1)
    
    #Translate said column into end user labels 
    y_pred_class = np.where(predicted_idx == 1, 'GREEN', np.where(predicted_idx == 0, 'RED', 'BLUE' ))
    
    #Assign class to place holder column
    pd_df["pred_class"] = y_pred_class

    #Store the 3 prediction strengths into place holder columns
    pd_df["prediction_class1"] = y_pred[:,0]
    pd_df["prediction_class2"] = y_pred[:,1]
    pd_df["prediction_class3"] = y_pred[:,2]
    
    #Write out back to a monitoring table
    result = spark.createDataFrame(pd_df)
    result.write.option("mergeSchema","true").format("delta").option("header", "true").mode("append").saveAsTable("<myschema>.<mytableoutput>")
    
#write stream out
fixedValueStream.writeStream.foreachBatch(foreach_batch_function).start()

Solution

  • As @AlexOtt pointed out in the comments there is no need to apply foreachBatch as far as your question is currently written.

    All you need to do it applying the UDF to your streaming Dataframe using withColumn.


    In case you do need to use foreachBatch, maybe because you are writing out to a non-streamable sink format you can read below how to do it.

    Looking at the documentation on foreachBatch in the Structured Streaming Programming Guidelines, you do not need a format and a outputMode in your final writeStream. Instead, the logic where the data is written to is defined within the foreachBatch function. Also, using saveAsTable in a stream also does not look right.

    Overall, your code shoule look like:

    def batchpredictions(df, epoch_id):
        # Split the lines into words
        pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
        prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1', 'feature2', 'feature3')))
        prediction.write.mode("append").format("delta").save("/tmp/delta-table")
        
    
    fixedValueStream.writeStream.foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").start()