Search code examples
databricksdelta-lakekedro

How to dynamically pass save_args to kedro catalog?


I'm trying to write delta tables in Kedro. Changing file format to delta makes the write as delta tables with mode as overwrite.

Previously, a node in the raw layer (meta_reload) creates a dataset that determines what's the start date for incremental load for each dataset. each node uses that raw dataset to filter the working dataset to apply the transformation logic and write partitioned parquet tables incrementally.

But now writing delta with mode as overwrite with just file type change to delta makes current incremental data overwrite all the past data instead of just those partitions. So I need to use replaceWhere option in save_args in the catalog. How would I determine the start date for replaceWhere in the catalog when I need to read the meta_reload raw dataset to determine the date. Is there a way to dynamically pass the save_args from inside the node?

my_dataset:
  type: my_project.io.pyspark.SparkDataSet
  filepath: "s3://${bucket_de_pipeline}/${data_environment_project}/${data_environment_intermediate}/my_dataset/"
  file_format: delta
  layer: intermediate
  save_args:
    mode: "overwrite"
    replaceWhere: "DATE_ID > xyz"  ## what I want to implement dynamically
    partitionBy: [ "DATE_ID" ]

Solution

  • I've answered this on the GH discussion. In short you would need to subclass and define your own SparkDataSet we avoid changing the underlying API of the datasets at a Kedro level, but you're encouraged to alter and remix this for your own purposes.