Databricks DLT DataFrame - How to use Schemas
I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.
The input table has the following columns (trimmed down for brevity):
message_schema = StructType([
StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
StructField("storeId", StringType(), True, {"comment": "Store ID"}),
StructField("message", StringType(), True, {"comment": "Message content"}),
])
The message
column above contains a stringified JSON object, with the following fields (trimmed down for brevity):
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),
])
Here's my code to read the table from the stream and build the DF:
df = spark.readStream.table("tablename")
df = df.where(
col("MESSAGE").isNotNull()
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
).select(
col("BAYID"),
col("STOREID"),
F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
).select(
F.expr("uuid()").alias("ID"),
col("BAYID").alias("BAY_ID"),
col("STOREID").alias("STORE_ID"),
col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME
and
ATTACK_ANGLE_NBR
columns are showing the comments that I set in the schema. However, the BAY_ID
and STORE_ID
columns
do not have my comments.
I was able to build the comments by adding the following code after the block above:
df = (df
.withMetadata("BAY_ID", {"comment": "Bay ID"})
.withMetadata("STORE_ID", {"comment": "Store ID"})
)
However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?
UPDATE:
In the first answer below, it was suggested to use the schema on the @dlt.table()
. However, we need to use @dlt.view()
, which does not allow specifying a schema.
However, I see that there is a schema()
on the DataStreamReader
, so I tried this:
df = (
spark.readStream
.schema(message_schema)
.table(
f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
)
)
Unfortunately, though, this made no difference in the tables.
You give the schema in the dlt decorator like below.
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
In your case you create a schema for returning dataframe from the dlt and use it.
from pyspark.sql.types import *
from pyspark.sql import functions as F
import dlt
output_schema = StructType([
StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
])
event_schema = StructType([
StructField("Id", StringType(), True, {"comment": "Event ID"}),
StructField("Payload", StructType([
StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
]),)
])
@dlt.table(
comment="Raw data on sales",
schema=output_schema)
def sales():
df = spark.readStream.table("tablename")
df = df.where(F.col("MESSAGE").isNotNull()).\
select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
select(
F.expr("uuid()").alias("ID"),
F.col("BAYID").alias("BAY_ID"),
F.col("STOREID").alias("STORE_ID"),
F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
)
return df
Below is the sample data used.
sample_data = [
("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'),
("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]
output:
and comments in Overview tab.