We have large data around 100 GB which is stored across 200 parquet files. So to save time we are running 10 jobs in parallel (reading 20 files per job).
But as delta table maintains history after each version its adding 200 more parquet files. So our logic of reading files and pushing data is not working. Is there any way to read files only for latest version or maintain only one (latest) version?
directory_path = "abfss://path/<table>"
# Get the list of files within the directory
file_list = dbutils.fs.ls(directory_path)
file_process = [ ]
for index, file_info in enumerate(file_list):
if file_info.isFile():
file_path = file_info.path
file_name = file_info.path
pattern = r'part-\d+'
match = re.search(pattern, file_name)
numeric_portion=0
if match:
file_name = match.group()
numeric_portion = file_name[5:]
print(numeric_portion)
if int(numeric_portion) >=0) and int(numeric_portion) <=20:
// this way we read 20 files in each job
Using spark by default you will get the data for latest version.
spark.read.format("delta").load("/user/")
To get file path,
spark.read.format("delta").load("/user/").select("*","_metadata.file_path")
If you don't want to use spark, you can use below code to get files of latest version.
import json
import pandas as pd
crcf = [i[0] for i in dbutils.fs.ls("user/_delta_log") if ".crc" in i[0]]
max_ver_path = "/"+max(crcf).replace(":","")
print(max_ver_path)
tmp = json.load(open(max_ver_path))
latest_paths = [ "user/"+i['path'] for i in tmp["allFiles"]]
latest_paths
Read those files using pandas.
df = pd.concat([pd.read_parquet("/dbfs/"+f) for f in latest_paths], ignore_index=True)
df
You can also convert that into spark.
spark.createDataFrame(df)