Search code examples
azurepysparkautomationazure-blob-storageparquet

Extracting data from blob storage to Databricks[automation]


I have blob data with in different folder by year, month and date(nested folder) refreshing daily. I need to design a pipeline which will efficiently load the historical data from blob to azure databricks. Could you please guide the proper way of storing daily and historical data in databricks?

Steps followed : Created mount point with storage blob contributor and able to access sample data like one parquet file of 2024/18/7/table_name.parquet.

What should be the way to load it daily with automation along with historical data. Thank you


Solution

  • Here, you need to consider two things while copying the data from Storage account to databricks.

    1. Copying all the files to the same file in databricks i.e, the source files to be merged into a single target file in databricks.

      Here, first thing is you need to copy all the old dates files to the databricks file. Then union the daily date files to the same databricks file every day.

      Code for copying all dates files till now to the target file.

      df1=spark.read.parquet('/mnt/mymount/2024/*/*/*.parquet',inferSchema=True)
      df1.write.parquet('/dbfs/FileStore/tables/target.parquet')
      dbutils.fs.ls('/dbfs/FileStore/tables')
      

      enter image description here

      Now, take another notebook and use below code which loads daily dates files to the same location.

      from datetime import datetime
      temp_date=datetime.today().strftime('%Y/%d/%m')
      mydate=temp_date[0:temp_date.rfind('/')]+'/'+str(int(temp_date.split('/')[-1]))
      print(mydate)
      
      mydf=spark.read.parquet('/dbfs/FileStore/tables/target.parquet',inferSchema=True)
      files_path='/mnt/mymount/'+mydate+'/*.parquet'
      mydf.union(spark.read.parquet(files_path,inferSchema=True))
      mydf.display()
      mydf.write.mode("overwrite").parquet('/dbfs/FileStore/tables/target.parquet')
      

      enter image description here

      You need to schedule this notebook every day so that it copied the daily data.

      enter image description here

    2. Copying all the files to the databricks as individual files with same file folder structure.

      Use the below code to first copy the files till now to the target.

      import glob, os
      list_of_paths=[]
      for file in glob.iglob('/dbfs/mnt/mymount/2024/**/*.parquet',recursive=True):
          list_of_paths.append(file.replace('/dbfs',''))
      print(list_of_paths)
      
      for i in list_of_paths:
          spark.read.parquet(i,inferSchema=True).write.parquet('/dbfs/FileStore/tables/'+i.replace('/mnt/mymount/',''))
      

      This code gets all the file paths recursively and copies to the target location by creating same structure as source.

      enter image description here

      And for the daily, use below code in a new notebook.

      files_today_path='/mnt/mymount/'+mydate
      list_today_paths=[]
      for i in dbutils.fs.ls(files_today_path):
          list_today_paths.append(i.path)
      print(list_today_paths)
      for i in list_of_paths:
          spark.read.parquet(i,inferSchema=True).write.mode("overwrite").parquet('/dbfs/FileStore/tables/'+i.replace('dbfs:/mnt/mymount/',''))
      

      Get the current date format of the folder like the above and use it. Schedule this notebook every day.