Search code examples
pythonazureapache-sparkregistrationretention

How can I register a specific version of a Delta Table in Azure Machine Learning Studio from Azure ADLS Gen 1?


I created a Delta Table in ADLS Gen 1 with the following code in Databricks:

df.write.format("delta").mode("overwrite").saveAsTable("db.my_tbl", path ='adl://organisation.azuredatalakestore.net/folder_name/my_data')

Sometimes, I re-run the code above to generate a new version of the my_tbl table. As usual with delta tables, a history is build and it must regulary be optimized and vaccumed. Now, I am often retraining a ML Model in Azure Machine Learning Studio and am wondering if it possible to register a specific version of the delta table?

Currently, even after vaccuming, all my delta files (including older versions) are registered in Azure ML Studio when reading the parquet files from my_data folder! That is because I can not lower the retention period of the delta table below 168h except turning of spark.databricks.delta.retentionDurationCheck.enabled. I do not want to turn it off.

I register my dataset through the ML Studio Interface as a File Dataset (not Tabular Dataset). This registration looks like this:

enter image description here

Right now I see only the option to create a copy of my_data and reading it instead. Is there another way? Do you know if I can specify something in the path to point to the "right" .parquet files (belonging to a specific delta table version)?


Solution

  • If you are using Databricks and don't mind some workaround when moving folders, here is a solution based on manifest files. This script lists all files of your LATEST delta table version. It can be used for pipelines with incremental data updates.

    spark.conf.set("spark.databricks.delta.symlinkFormatManifest.fileSystemCheck.enabled", False) # this type of filesystemcheck is an AWS feature currently, so it needs to be disabled on Azure or Google Cloud
    
    from delta.tables import DeltaTable
    
    delta_path=<'full path in data lake pointing to your table'> # note your Databricks workspace needs WRITE access for the data lake!
    
    newpath=<'folder outside delta table'>
    
    delta_table = DeltaTable.forPath(spark, f"{delta_path}")
    
    #clean up existing folder to be on the safe side 
    dbutils.fs.rm(f"{newpath}/_symlink_format_manifest", recurse=True)
    
    manifest = delta_table.generate("symlink_format_manifest")
    
    # The automatically created symlink folder needs to be moved out from the delta path!
    # Otherwise spark import will not recognize the format, as a usual delta table is expected under this path. The symlink_format_manifest is not a correct delta table partition!
    
    dbutils.fs.mv(f"{delta_path}/_symlink_format_manifest", f"{newpath}/_symlink_format_manifest", recurse=True)
    
    # create the list of parquet files from the manifest
    filelist=spark.read.text(f"{newpath}/_symlink_format_manifest/*").rdd.flatMap(lambda x: x).collect()
    
    filelist