Search code examples
dataframeazure-databricksdatabricks-sqldelta-live-tables

Databricks DLT DataFrame - How to use Schemas with Comments


Databricks DLT DataFrame - How to use Schemas

I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.

The input table has the following columns (trimmed down for brevity):

message_schema = StructType([
    StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
    StructField("storeId", StringType(), True, {"comment": "Store ID"}),
    StructField("message", StringType(), True, {"comment": "Message content"}),
])

The message column above contains a stringified JSON object, with the following fields (trimmed down for brevity):

event_schema = StructType([
    StructField("Id", StringType(), True, {"comment": "Event ID"}),
    StructField("Payload", StructType([
        StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
        StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
    ]),
])

Here's my code to read the table from the stream and build the DF:

    df = spark.readStream.table("tablename")

    df = df.where(
        col("MESSAGE").isNotNull()
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
    ).select(
        F.expr("uuid()").alias("ID"),
        col("BAYID").alias("BAY_ID"),
        col("STOREID").alias("STORE_ID"),
        col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
        col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
    )

After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME and ATTACK_ANGLE_NBR columns are showing the comments that I set in the schema. However, the BAY_ID and STORE_ID columns do not have my comments.

I was able to build the comments by adding the following code after the block above:

    df = (df
          .withMetadata("BAY_ID", {"comment": "Bay ID"})
          .withMetadata("STORE_ID", {"comment": "Store ID"})
          )

However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?

UPDATE:

In the first answer below, it was suggested to use the schema on the @dlt.table(). However, we need to use @dlt.view(), which does not allow specifying a schema.

However, I see that there is a schema() on the DataStreamReader, so I tried this:

    df = (
        spark.readStream
        .schema(message_schema)
        .table(
            f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
        )
    )

Unfortunately, though, this made no difference in the tables.


Solution

  • You give the schema in the dlt decorator like below.

    sales_schema = StructType([
      StructField("customer_id", StringType(), True),
      StructField("customer_name", StringType(), True),
      StructField("number_of_line_items", StringType(), True),
      StructField("order_datetime", StringType(), True),
      StructField("order_number", LongType(), True)]
    )
    
    @dlt.table(
      comment="Raw data on sales",
      schema=sales_schema)
    def sales():
      return ("...")
    
    

    In your case you create a schema for returning dataframe from the dlt and use it.

    from pyspark.sql.types import *
    from pyspark.sql import functions as F
    import dlt
    
    output_schema = StructType([
        StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
        StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
        StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
        StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
        StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
    ])
    
    event_schema = StructType([
        StructField("Id", StringType(), True, {"comment": "Event ID"}),
        StructField("Payload", StructType([
            StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
            StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
        ]),)
    ])
    
    
    @dlt.table(
      comment="Raw data on sales",
      schema=output_schema)
    def sales():
      df = spark.readStream.table("tablename")
    
      df = df.where(F.col("MESSAGE").isNotNull()).\
      select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
      select(
            F.expr("uuid()").alias("ID"),
            F.col("BAYID").alias("BAY_ID"),
            F.col("STOREID").alias("STORE_ID"),
            F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
            F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
        )
      return df
    

    Below is the sample data used.

    sample_data = [ 
    ("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
    ("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'), 
    ("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]
    

    output:

    enter image description here

    and comments in Overview tab.

    enter image description here