I have noticed some peculiar behaviour with DLT. You can reproduce the exact error by copying the code from the end of this post into a Python file/notebook and creating a new test DLT pipeline.
You should see the same error as this:
TypeError: '>' not supported between instances of 'NoneType' and 'int'
But the error that should be returned in normal "pyspark world" would be:
Exception: specified columns (name) are not unique
Example of duplicate: Row(name='Charlie', count=2)
The error that shows up in DLT is what would happen if the table was empty (0 rows) Checking the PK for each table before returning tables in pipelines is a very useful engineering practice as it often surfaces DQ issues.
If this is not a bug, but a feature of DLT, do you know if it is possible to validate the PK in a table using DLT expectations?
import dlt
from pyspark.sql import functions as F, types as T
@dlt.table(
name="test1",
comment="In the world of data, so bright, Databricks and Spark take their flight. With clusters that gleam, They process the stream, In the data lake's shimmering light!"
)
def create_df():
schema = T.StructType([
T.StructField("id", T.IntegerType(), True),
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True)])
data = [(1, "Alice", 25),
(2, "Bob", 30),
(3, "Charlie", 35),
(4, "Charlie", 40)]
df = spark.createDataFrame(data, schema)
return df
def check_primary_key(df, columns):
"""
Verify set of "columns" has unique values in "df"
"""
count = df.groupby(columns).count()
max_count = count.agg(F.max(F.col('count')).alias('max_count')).select('max_count').collect()[0][0]
if max_count > 1:
example = count.filter(F.col('count') == max_count).first()
raise Exception(f"Specified columns ({columns}) are not unique.\nExample of duplicate: {str(example)}")
@dlt.table(
name="test2"
)
def create_df():
df = dlt.read("test1")
check_primary_key(df, 'name')
return df
According to the DLT limitations, you cannot use spark actions (.count()
, .collect()
, etc within a table definition.
However you can use DLT Expectations to have this check:
@dlt.table(
name="table1",
)
def create_df():
schema = T.StructType([
T.StructField("id", T.IntegerType(), True),
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True)])
data = [(1, "Alice", 25),
(1, "Bob", 30),
(3, "Charlie", 40)]
df = spark.createDataFrame(data, schema)
return df
@dlt.table(name="table2")
@dlt.expect("unique_pk", "num_entries = 1")
def create_df():
df = dlt.read("table1")
df = df.groupBy("id").count().withColumnRenamed("count","num_entries")
return df