I have a data lake path as follows:
SYSTEM/Data/Year/Month/Date/Hist/hist.parquet
hist.parquet
is present in every Year/Month/Date/Hist
folder structure
I want to append all the parqets for all years as a single parquet file in pyspark
in a differnt location.While doing so i.e before appending every single parquet I want to create a Date column upload_date
reflecting the Year/Month/Date of the path it came from as YYYY-MM-DD.I also want to remove duplicated rows before each file append. Column names of every parquet file is the same
my df
:-
ID | Team | Flag | Action | Status |
---|
for year_folder in year_folders:
month_folders = dbutils.fs.ls(year_folder.path)
How do I achieve this ?
I think you can load data into dataframe first and then transform it.
# first, create a dataframe with filepath.
from pyspark.sql.functions import input_file_name
df = spark.read.format('parquet').load('SYSTEM/Data/*/*/*/Hist/hist.parquet')
df = df.withColumn('filepath', input_file_name())
df = df.dropDuplicates() # dropping dups here is like dropping before appending
# then extract upload_date from filepath.
df = df.withColumn('upload_date', regexp_extract('filepath', r'(\d+/\d+/\d+)', 1))
df = df.withColumn('upload_date', regexp_replace('upload_date', '/', '-'))