Search code examples
spark-streamingazure-databricksdatabricks-sqldelta-live-tables

What are stream and readStream functions in Databricks DLT SQL?


In the Databricks Delta Live Table documentations, I saw some stream() and ˙readStream()˙ calls in SQL code examples. But I can't figure out what are these, what's the difference. Are those functions documented anywhere?

While spark.readStream exists in Python/Scala/Java/R, I couldn't find any SQL examples. So how to use it, what are the parameters? Sometimes stream() also works in SQL, is it just an alias to readStream() which is an alias to spark.readStream() in Python?


Solution

  • Actually, it is an alias for reading delta stream table.

    dlt.read_stream()
    

    Below, the code creates a delta live table from cloud files.

    @dlt.table
    def streaming_bronze():
      return (
        # Since this is a streaming source, this table is incremental.
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load("/path/to/raw/data")
      )
    

    To read data from streaming_bronze, you write the following code.

    @dlt.table
    def streaming_silver():
      return dlt.read_stream("streaming_bronze").where(...)
    

    If you see here, to read the streaming_bronze table, dlt.read_stream("streaming_bronze") is used.

    The same thing, if you want in the SQL language, the STREAM function is used.

    STREAM(LIVE.streaming_bronze)
    

    For more information, follow this documentation.

    Next, the alias for spark.readStream(), which actually creates delta streaming table using the Python decorator @dlt.table, and in SQL is below.

    CREATE OR REFRESH STREAMING TABLE table_name
    AS SELECT * FROM cloud_files(
      "path/to/raw/data", "json"
    )
    

    This creates a delta streaming table. Then, to read this data again, you use STREAM(<table_name>).

    CREATE OR REFRESH STREAMING TABLE streaming_silver
    AS SELECT * FROM STREAM(LIVE.streaming_bronze)
    

    Basically, whenever you want to add data incrementally to streaming table you need to use STREAM function.