Search code examples
apache-sparkdatabricksdisplayspark-structured-streaming

Databricks : structure stream data assignment and display


I have following stream code in a databricks notebook (python).

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("MyTest") \
    .getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream \
    .format("delta") \
    .table("myschema.streamTest")

In notebook 2, I have

def foreach_batch_function(df, epoch_id):
    test = df
    print(test['simplecolumn'])
    display(test['simplecolumn'])
    test['simplecolumn'].display

lines.writeStream.outputMode("append").foreachBatch(foreach_batch_function).format('console').start()

When I execute the above where can I see the output from the .display function? I looked inside the cluster driver logs and I don't see anything. I also don't see anything in the notebook itself when executed except a successfully initialized and executing stream. I do see that the dataframe parameter data is displayed in console but I am trying to see that assigning test was successful.

I am trying to carry out this manipulation as a precursor to time series operations over mini batches for real-time model scoring and in python - but I am struggling to get the basics right in the structured streaming world. A working model functions but executes every 10-15 minutes. I would like to make it realtime via streams and hence this question.


Solution

  • You're mixing different things together - I recommend to read initial parts of the structured streaming documentation or chapter 8 of Learning Spark, 2ed book (freely available from here).

    You can use display function directly on the stream, like (better with checkpointLocation and maybe trigger parameters as described in documentation):

    display(lines)
    

    Regarding the scoring - usually it's done by defining the user defined function and applying it to stream either as select or withColumn functions of the dataframe. Easiest way is to register a model in the MLflow registry, and then load model with built-in functions, like:

    import mlflow.pyfunc
    
    pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
    preds = lines.withColumn("predictions", pyfunc_udf(params...))
    

    Look into that notebook for examples.