I have a delta table 'targetTable' which has 35 billion records. Every day I get 100 million records from source and I have to perform upsert operation on targetTable.
TargetTable:
as of now - it is taking 45min - 1hr to complete the job and it is keep on increasing.
any suggestion whether to use Z order or partition of files for better performance apart or any other
you can use parallel threading concepts .please find the sample code which we have handled for s3 writing . same logic you can tweak for adls writing .
dbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= username,password= password, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",1000000) \
.load()
print(data_df.count())
# DBTITLE 1,Multithreaded S3 raw/server write
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
curr_df = data_df.filter(f"dt_id='{curr_date}'")
print(f"curr_date - {curr_date} and count - {curr_df.count()}")
curr_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{raw_bucket}/{db}/{table}/")
serve_df = curr_df.withColumn('az_ld_ts', F.current_timestamp())
serve_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{serve_bucket}/{db}/{table}/")
print(f"completed for {curr_date}")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
total_days = abs(end_date-start_date).days
print(f"total days - {total_days}. Creating {total_days} threads..")
jobs = []
results_done = []
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for single_date in daterange(start_date, end_date):
curr_date = single_date.strftime("%Y-%m-%d")
jobs.append(e.submit(writeS3, curr_date))
for job in futures.as_completed(jobs):
result_done = job.result()
print(f"Job Completed - {result_done}")
print("Task complete")