Search code examples
azureazure-blob-storageazure-storageazure-data-factoryazure-data-lake

How to ingest/copy files from two rotating Azure Storage Account containers into another container


Given two containers:

Source: An Azure StorageV2 Account with two containers named A and B containing blob files that will be stored flat in the root directory in the container.

Destination: A Azure Data Lake Gen2 (for simplification purposes, consider it another Storage Account with a single destination container).

Objective: I am trying to copy/ingest all files within the currently active source container at the top of the month. For the remainder of that month, any files newly added/overwritten files inside the active source container need to be ingested as well.

For each month, there will only be one active container that we care about. So January would use Container A, Feb would use Container B, March would use Container A, etc. Using Azure Data Factory, I’ve already figured out how to accomplish this logic of swapping containers by using a dynamic expression in the file path.

@if(equals(mod(int(formatDateTime(utcnow(),'%M')), 2), 0), ‘containerB, ‘ContainerA’)

What I’ve tried so far: I set up a Copy pipeline using a Tumbling Window approach where a trigger runs daily to check for new/changed files based on the LastModifiedDate as described here: https://learn.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-lastmodified-copy-data-tool. However, I ran into a conundrum regarding the fact that the Last Modified date of the files to be ingested at the top of the month will by nature have a LastModifiedDate in the past compared to when the trigger's start date window, as this container is prepared ahead of time in the days leading up the turn of the mount right before the containers are swapped. So because the LastModifiedDate is in the past compared to the start window of the trigger, then those existing files on the 1st of the month will never get copied, only new/changed files after the trigger start date. If I manually fire the trigger by hardcoding an earlier start date, then any files added to the container mid-month end up getting ingested for the remainder of the month as expected.

So how do I solve that base case for files modified before the start date? If this can be solved, then everything can happen in one pipeline and one trigger. Otherwise, I will have to figure out another approach.

And in general, I am open to ideas as to what is the best approach to take here. The files will be ~2gb and around 20,000 in quantity.


Solution

  • You can do it by setting your trigger at the end of each day and try to copy all the new/updated files using last modified date on that day like below.

    Assuming that there is no file uploading to second container when first container is active.

    Please follow the below steps:

    • Go to Data factory and drag the copy activity in your pipeline.

    • Create the source dataset by creating the linked service. Give your container condition by clicking on Add dynamic content in source dataset.

      @if(equals(mod(int(formatDateTime(utcnow(),'%M')), 2), 0), ‘containerb, ‘containera’)

    enter image description here

    • Then select the Wildcard file path in the File path type and give * in file path like below to copy multiple files. enter image description here

    • Here I am copying new/updated files in the last 24 hours. Go to Filter by last modified and give @adddays(utcNow(),-1) in start time and @utcNow() in the end time.

    enter image description here

    • As we are scheduling this with trigger at the end of each day, it will look for new/updated files from the last 24 hours to start time.

    • Give your container of another storage account as sink dataset.

    • Now, click on the Add trigger and create a Tumbling Window trigger like below.

    enter image description here

    You can give the start Date above as your wish at the end of the day based on your pipeline execution time.

    Please make sure you publish the pipeline and trigger before execution.

    If your second container also having new/modified files when the first container is active, then you may give a try like this in the start time of last modified date.

    @if(equals(int(formatDateTime(utcNow(),'%D')),1), adddays(utcNow(),-31), adddays(utcNow(),-1))