Search code examples
pysparkdatabricksazure-databricksdelta-live-tables

Can a Delta Live Table (DLT) be passed as a parameter to a User Defined Functions (UDF) in Databricks?


Databricks' documentation on UDFs shows very simple examples, e.g. integer transformation with integers as parameters (https://docs.databricks.com/spark/latest/spark-sql/udf-python.html), but says nothing about passing Delta Live Tables as a parameters.

In my case, I have two DLTs that are being created, and I then need to perform some transformations on them that are only possible with pandas, finally receiving a DLT as well.

I need something like this:

@dlt.table
def dlt1():
  return spark.sql("query...")

@dlt.table
def dlt2():
  return spark.sql("query...")

@dlt.table
def dlt3():
  return spark.sql("SELECT * FROM search(LIVE.dlt1, LIVE.dlt2)")

def search(dlt1, dlt2):
  dlt2_not_null = list(dlt2.filter("someColumn").isNotNull()).distinct().toPandas()["someColumn"].values)
  result = dlt1.filter(~dlt1["someColumn"].isin(dlt2_not_null))

  return result

spark.udf.register("search", search)

Solution

  • No, you can't pass the Spark or DLT tables as function parameters for use in SQL. (Same is the true for "normal" Spark SQL as well). But really, your function doesn't look like UDF - it's just a "normal" function that works with two dataframes, so you can easily implement it in DLT, like this:

    @dlt.table
    def dlt3():
      dlt1 = dlt.read("dlt1")
      dlt2 = dlt.read("dlt2")
      dlt2_not_null = list(dlt2.filter("someColumn").isNotNull()).distinct().toPandas()["someColumn"].values)
      result = dlt1.filter(~dlt1["someColumn"].isin(dlt2_not_null))
    
      return result
    

    If you have the same pattern in multiple places, then you can programmatically create DLT tables/views as described in documentation, something like this:

    def make_search(table_name, dlt1_name, dlt2_name):
      @dlt.table(name=table_name)
      def temp_func():
        dlt1 = dlt.read(dlt1_name)
        dlt2 = dlt.read(dlt2_name)
        dlt2_not_null = list(dlt2.filter("someColumn").isNotNull()).distinct().toPandas()["someColumn"].values)
        result = dlt1.filter(~dlt1["someColumn"].isin(dlt2_not_null))
    
        return result
    

    Although, frankly speaking, this kind of logic should be doable with the normal joins...