Search code examples
pythonscaladatabricksazure-databricks

How to read parquet files for latest version of Delta table?


We have large data around 100 GB which is stored across 200 parquet files. So to save time we are running 10 jobs in parallel (reading 20 files per job).

But as delta table maintains history after each version its adding 200 more parquet files. So our logic of reading files and pushing data is not working. Is there any way to read files only for latest version or maintain only one (latest) version?

directory_path = "abfss://path/<table>"

# Get the list of files within the directory
file_list = dbutils.fs.ls(directory_path)  
file_process = [ ]
for index, file_info in enumerate(file_list):
  if file_info.isFile():
        file_path = file_info.path
        file_name = file_info.path
        
        pattern = r'part-\d+'
        match = re.search(pattern, file_name)
        numeric_portion=0
        if match:
            file_name = match.group() 
            numeric_portion = file_name[5:]
            print(numeric_portion)
            
        if int(numeric_portion) >=0) and int(numeric_portion) <=20:
            // this way we read 20 files in each job

Solution

  • Using spark by default you will get the data for latest version.

    spark.read.format("delta").load("/user/")
    

    To get file path,

    spark.read.format("delta").load("/user/").select("*","_metadata.file_path")
    

    enter image description here

    If you don't want to use spark, you can use below code to get files of latest version.

    import json
    import pandas as pd
    crcf = [i[0] for i in dbutils.fs.ls("user/_delta_log") if  ".crc"  in i[0]]
    max_ver_path = "/"+max(crcf).replace(":","")
    print(max_ver_path)
    tmp = json.load(open(max_ver_path))
    latest_paths = [ "user/"+i['path'] for i in tmp["allFiles"]]
    latest_paths
    

    enter image description here

    Read those files using pandas.

    df = pd.concat([pd.read_parquet("/dbfs/"+f) for f in latest_paths], ignore_index=True)
    df
    

    enter image description here

    You can also convert that into spark.

    spark.createDataFrame(df)