I created a Delta Table in ADLS Gen 1 with the following code in Databricks:
df.write.format("delta").mode("overwrite").saveAsTable("db.my_tbl", path ='adl://organisation.azuredatalakestore.net/folder_name/my_data')
Sometimes, I re-run the code above to generate a new version of the my_tbl
table. As usual with delta tables, a history is build and it must regulary be optimized and vaccumed. Now, I am often retraining a ML Model in Azure Machine Learning Studio and am wondering if it possible to register a specific version of the delta table?
Currently, even after vaccuming, all my delta files (including older versions) are registered in Azure ML Studio when reading the parquet files from my_data
folder! That is because I can not lower the retention period of the delta table below 168h except turning of spark.databricks.delta.retentionDurationCheck.enabled
. I do not want to turn it off.
I register my dataset through the ML Studio Interface as a File Dataset (not Tabular Dataset). This registration looks like this:
Right now I see only the option to create a copy of my_data
and reading it instead. Is there another way? Do you know if I can specify something in the path to point to the "right" .parquet files (belonging to a specific delta table version)?
If you are using Databricks and don't mind some workaround when moving folders, here is a solution based on manifest files. This script lists all files of your LATEST delta table version. It can be used for pipelines with incremental data updates.
spark.conf.set("spark.databricks.delta.symlinkFormatManifest.fileSystemCheck.enabled", False) # this type of filesystemcheck is an AWS feature currently, so it needs to be disabled on Azure or Google Cloud
from delta.tables import DeltaTable
delta_path=<'full path in data lake pointing to your table'> # note your Databricks workspace needs WRITE access for the data lake!
newpath=<'folder outside delta table'>
delta_table = DeltaTable.forPath(spark, f"{delta_path}")
#clean up existing folder to be on the safe side
dbutils.fs.rm(f"{newpath}/_symlink_format_manifest", recurse=True)
manifest = delta_table.generate("symlink_format_manifest")
# The automatically created symlink folder needs to be moved out from the delta path!
# Otherwise spark import will not recognize the format, as a usual delta table is expected under this path. The symlink_format_manifest is not a correct delta table partition!
dbutils.fs.mv(f"{delta_path}/_symlink_format_manifest", f"{newpath}/_symlink_format_manifest", recurse=True)
# create the list of parquet files from the manifest
filelist=spark.read.text(f"{newpath}/_symlink_format_manifest/*").rdd.flatMap(lambda x: x).collect()
filelist