Search code examples
databricksdelta-live-tables

Check Primary Key before returning table in Delta Live Tables


I have noticed some peculiar behaviour with DLT. You can reproduce the exact error by copying the code from the end of this post into a Python file/notebook and creating a new test DLT pipeline.

You should see the same error as this:

TypeError: '>' not supported between instances of 'NoneType' and 'int'

But the error that should be returned in normal "pyspark world" would be:

Exception: specified columns (name) are not unique
Example of duplicate: Row(name='Charlie', count=2)

The error that shows up in DLT is what would happen if the table was empty (0 rows) Checking the PK for each table before returning tables in pipelines is a very useful engineering practice as it often surfaces DQ issues.

If this is not a bug, but a feature of DLT, do you know if it is possible to validate the PK in a table using DLT expectations?

import dlt
from pyspark.sql import functions as F, types as T


@dlt.table(
    name="test1",
    comment="In the world of data, so bright, Databricks and Spark take their flight. With clusters that gleam, They process the stream, In the data lake's shimmering light!"
    )
def create_df():
    schema = T.StructType([
        T.StructField("id", T.IntegerType(), True),
        T.StructField("name", T.StringType(), True),
        T.StructField("age", T.IntegerType(), True)])

    data = [(1, "Alice", 25),
        (2, "Bob", 30),
        (3, "Charlie", 35),
        (4, "Charlie", 40)]

    df = spark.createDataFrame(data, schema)
    return df


def check_primary_key(df, columns):

    """
    Verify set of "columns" has unique values in "df"
    """

    count = df.groupby(columns).count()
    max_count = count.agg(F.max(F.col('count')).alias('max_count')).select('max_count').collect()[0][0]

    if max_count > 1:
        example = count.filter(F.col('count') == max_count).first()
        raise Exception(f"Specified columns ({columns}) are not unique.\nExample of duplicate: {str(example)}")


@dlt.table(
    name="test2"
)
def create_df():
    df = dlt.read("test1")
    check_primary_key(df, 'name')
    return df

Solution

  • According to the DLT limitations, you cannot use spark actions (.count(), .collect(), etc within a table definition.

    However you can use DLT Expectations to have this check:

    @dlt.table(
        name="table1",
        )
    def create_df():
        schema = T.StructType([
            T.StructField("id", T.IntegerType(), True),
            T.StructField("name", T.StringType(), True),
            T.StructField("age", T.IntegerType(), True)])
    
        data = [(1, "Alice", 25),
            (1, "Bob", 30),
            (3, "Charlie", 40)]
    
        df = spark.createDataFrame(data, schema)
        return df
    
    @dlt.table(name="table2")
    @dlt.expect("unique_pk", "num_entries = 1")
    def create_df():
        df = dlt.read("table1")
        df = df.groupBy("id").count().withColumnRenamed("count","num_entries")
        return df