Search code examples
apache-sparkpysparkdatabricksspark-streamingazure-databricks

In pyspark, what is the difference between dlt.read_stream() and spark.readstream()?


I am tryting to understand In pyspark, what is the difference between dlt.read_stream() and spark.readstream()? can we replace dlt.read_stream() with spark.readstream() without affecting the data?

Reason of asking is I want to update few PII columns to null and then run the job as it is. but I am getting error as

Flow has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table. A Full Refresh will attempt to clear all data from table and then load all data from the streaming source.

and I came across this documentation:

The skipChangeCommits flag works only with spark.readStream using the option() function. You cannot use this flag in a dlt.read_stream() function.

So what could be the solution?


Solution

  • There are multiple questions here and I will respond to one from title. Other questions related to issues you have should have some scope which you should put in you question so we understand more of pipeline you have.

    spark.readStream() is used to create DataStreamReader from parameters you pass later through call chain, but basically this is standard Spark method which you use to start Structured Stream.

    dlt.read_stream() is Databricks specific method specifically used inside Delta Live Tables (DLT) to create structured stream from streaming table inside same pipeline.

    So if you are reading data from table updated in same pipeline I don't think you can use spark.readStream() (at least I don't see that from documentation).

    The other part of question gives answer for itself, you can't just randomly delete or update upstream tasks and expecting downstream tasks to work. skipChangeCommits option would work but it will work in such way that it will just skip all the changes in your upstream table. If you plan to change upstream table you will have to do full update of table and not just append data with stream.