Search code examples
apache-sparkmergedatabricksazure-databricksdelta-lake

performance improvement on databricks delta table while performing merge operation (upsert)


I have a delta table 'targetTable' which has 35 billion records. Every day I get 100 million records from source and I have to perform upsert operation on targetTable.

TargetTable:

  • 25 columns
  • 25 billion records
  • 1 column commID which is unique for each row

as of now - it is taking 45min - 1hr to complete the job and it is keep on increasing.

any suggestion whether to use Z order or partition of files for better performance apart or any other


Solution

  • you can use parallel threading concepts .please find the sample code which we have handled for s3 writing . same logic you can tweak for adls writing .

    dbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
    driver = "com.teradata.jdbc.TeraDriver"
    query = query.replace("startdt", "'"+start_date+"'").replace("enddt", "'"+end_date+"'")
    print(f"Query - {query}")
    data_df = spark.read \
              .format('jdbc') \
              .options(url= jdbcurl, user= username,password= password, query=query, driver= driver,numPartitions=100) \
              .option('customSchema', schema[1:-1]) \
              .option('ConnectionRetries', '3') \
              .option('ConnectionRetryInterval', '2000') \
              .option("fetchSize",1000000) \
              .load()
    
    print(data_df.count())
    
    
    
    
    # DBTITLE 1,Multithreaded S3 raw/server write
    from datetime import timedelta, date,datetime
    from concurrent import futures
    from pyspark.sql import functions as F
    
    def daterange(start_date, end_date):
        for n in range(int((end_date - start_date).days)):
            yield start_date + timedelta(n)
    
    def writeS3(curr_date):
      print(f"Starting S3 write for date - {curr_date}")
      curr_df = data_df.filter(f"dt_id='{curr_date}'")
      print(f"curr_date - {curr_date} and count - {curr_df.count()}")
      curr_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{raw_bucket}/{db}/{table}/")
      serve_df = curr_df.withColumn('az_ld_ts', F.current_timestamp())
      serve_df.write.format(format).mode("overwrite").option("replaceWhere", f"{partition_column}= '{curr_date}'").partitionBy(partition_column).save(f"{serve_bucket}/{db}/{table}/")
      print(f"completed for {curr_date}")
    
    
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")
    total_days = abs(end_date-start_date).days
    print(f"total days - {total_days}. Creating {total_days} threads..")
    
    jobs = []
    results_done = []
    
    with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
      print(f"{raw_bucket}/{db}/{table}/")
      for single_date in daterange(start_date, end_date):
        curr_date = single_date.strftime("%Y-%m-%d")
        jobs.append(e.submit(writeS3, curr_date))
    
      for job in futures.as_completed(jobs):
        result_done = job.result()
        print(f"Job Completed - {result_done}")
    
    print("Task complete")