I have following stream code in a databricks notebook (python).
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("MyTest") \
.getOrCreate()
# Create a streaming DataFrame
lines = spark.readStream \
.format("delta") \
.table("myschema.streamTest")
In notebook 2, I have
def foreach_batch_function(df, epoch_id):
test = df
print(test['simplecolumn'])
display(test['simplecolumn'])
test['simplecolumn'].display
lines.writeStream.outputMode("append").foreachBatch(foreach_batch_function).format('console').start()
When I execute the above where can I see the output from the .display function? I looked inside the cluster driver logs and I don't see anything. I also don't see anything in the notebook itself when executed except a successfully initialized and executing stream. I do see that the dataframe parameter data is displayed in console but I am trying to see that assigning test was successful.
I am trying to carry out this manipulation as a precursor to time series operations over mini batches for real-time model scoring and in python - but I am struggling to get the basics right in the structured streaming world. A working model functions but executes every 10-15 minutes. I would like to make it realtime via streams and hence this question.
You're mixing different things together - I recommend to read initial parts of the structured streaming documentation or chapter 8 of Learning Spark, 2ed book (freely available from here).
You can use display
function directly on the stream, like (better with checkpointLocation
and maybe trigger
parameters as described in documentation):
display(lines)
Regarding the scoring - usually it's done by defining the user defined function and applying it to stream either as select
or withColumn
functions of the dataframe. Easiest way is to register a model in the MLflow registry, and then load model with built-in functions, like:
import mlflow.pyfunc
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
preds = lines.withColumn("predictions", pyfunc_udf(params...))
Look into that notebook for examples.