Search code examples
azureazure-data-factoryazure-data-lakeazure-data-lake-gen2

Copy files from gen2 ADLS based on acknowledgement file


I am trying to copy data from gen2 ADLS into another ADLS using data factory pipeline. This pipeline runs daily and copies data only for that particular day. This has been done by providing start and end time in the copy activity.

Somedays the files in the source ADLS will be delayed so that the pipeline will run, but no data will be copied. In order to track this we have planned to keep an acknowledgment file after data copy into source ADLS, so that before copying we could check for the ack file and proceed data copy only if ack file is present.

So the check should happen every 10 mins If ack file is not present, this check should run after 10 mins and this should continue for 2 hrs. Within this 2 hrs, if file is present then the data copy should proceed and check task also should be stopped. If there is no data after 2 hrs then the job should fail.

I was trying with validation task in ADF. But one issue is with the folder name since my folder will be named with data and timestamp of creation (for eg: 2021-03-30-02-19-33). I have to exclude the timestamp part of folder while providing the folder name. enter image description here How is it possible. Is wildcard path accepted for validation activity?

Any leads how to implement this?

Is there any way to implement continuous check after 10 mins for 2 hrs in the get matadata task? Can we implement above scenario with get metadata task?


Solution

  • If we do have a requirement for using wildcard path, we have to write Scala/Python.... script on a notebook file and to execute from ADF.

    I have used below scala script which takes input parameters from ADF.

    import java.io.File
    import java.util.Calendar
    
    dbutils.widgets.text("mainFolderPath", "","")
    dbutils.widgets.text("finalFolderStartName", "","")
    dbutils.widgets.text("fileName", "","")
    dbutils.widgets.text("noOfTry", "1","")
    val mainFolderPath = dbutils.widgets.get("mainFolderPath")
    val finalFolderStartName = dbutils.widgets.get("finalFolderStartName")
    val fileName = dbutils.widgets.get("fileName")
    val noOfTry = (dbutils.widgets.get("noOfTry")).toInt
    
    println("Main folder path : " + mainFolderPath)
    println("Final folder start name : " + finalFolderStartName)
    println("File name to be checked : " + fileName)
    println("Number of tries with a gap of 1 mins : " + noOfTry)
    
    if(mainFolderPath == "" || finalFolderStartName == "" || fileName == ""){
      dbutils.notebook.exit("Please pass input parameters and rerun!")
    }
    
    def getListOfSubDirectories(directoryName: String): Array[String] = {
        (new File(directoryName))
            .listFiles
            .filter(_.isDirectory)
            .map(_.getName)   
    }
    
    var counter = 0
    var folderFound = false
    var fileFound = false
    
    try{
      while (counter < noOfTry && !fileFound) {
        val folders = getListOfSubDirectories(mainFolderPath)
        if(folders.exists(firstName => firstName.startsWith(finalFolderStartName))){
          folders.foreach(fol => {
            if(fol.startsWith(finalFolderStartName)){
              val finalPath = mainFolderPath + "/" + fol + "/" + fileName
              println("Final file path : " + finalPath)
              folderFound = true
              if(new File(finalPath).exists) {
                fileFound = true
              }else{
                println("found the final folder but no file found!")
                println("waiting for 10 mins! " + Calendar.getInstance().getTime())
                counter = counter+1
                Thread.sleep(1*60*1000)
              }
            }
          })
        }else{
        println("folder does not exists with name : " + mainFolderPath + "/" + finalFolderStartName + "*")
        println("waiting for 10 mins! " + Calendar.getInstance().getTime())
        counter = counter+1
        Thread.sleep(1*60*1000)
        }
      }
    }catch{
      case e : Throwable =>  throw e;
    }
    
    if(folderFound && fileFound){
      println("File Exists!")
    }else{
      throw new Exception("File does not exists!");
    }