Search code examples
pysparkdatabricksspark-structured-streamingdelta-live-tables

Copy (in delta format) of an append-only incremental table that is in JDBC (SQL)


My ultimate goal is to have a copy (in delta format) of an append-only incremental table that is in JDBC (SQL).

I have a batch process reading from the incremental append-only JDBC (SQL) table, with spark.read (since .readStream is not supported for JDBC sources). Every day, the most recent day of data is saved as a delta table. Note, this is not an append-only delta table - rather it is overwritten every day with the most recent day of data.

What I think I need next is spark.readStream out of the delta table, with the .option("skipChangeCommits", "true"). The Databricks Documentation here outlines exactly this.

I have selected Preview channel in pipeline settings.

The code for the final streaming table in my DLT pipeline is:

@table(
    name='streaming table',
)
def create_df():
    return spark.readStream.option("skipChangeCommits", "true").table("daily_batch")

However the error here is NameError: name 'table' is not defined,None,Map(),Map(),List(),List(),Map())

In case it is a typo in the documentation I have also tried with dlt.table and the error is:

pyspark.errors.exceptions.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view "daily_batch" cannot be found. Verify the spelling and correctness of the schema and catalog.


Solution

  • My solution to this was to simply use a regular job, not DLT. This simplifies a lot, as there is no need for an intermediate table. It was just:

    [JDBC incremental SQL] --Databricks-job--> [delta table write with append only mode].

    I saved the watermark as metadata in the delta table with

    .option("userMetadata", watermark.strftime("%Y-%m-%dT%H:%M:%S"))`
    

    And read it with each subsequent batch run:

    dt = delta.tables.DeltaTable.forPath(spark, path)
    watermark = dt.history().select(F.col("userMetadata")).first()[0]
    

    I used spark.read.format("sqlserver") to query from the JDBC server.