Search code examples
azureazure-data-factoryetl

How to copy XML files in a folder F1 based on whether its content is present on folder F2 (disregarding file names)


We already have an Azure Data Factory in place for other processes we run and hence are trying to use it to solve the following:

What we already have:

  • ADF deployed and running, we'd just create a new pipeline/data flow;
  • Linked Service to access the required folders (as on premises).

What we don't have at the moment but will likely be needed for the end solution:

  • Any type of storage in our resource group, be it a DB, Blob Storage, etc. We can however use the FS we have access via the Linked Service as volume is not high and we'll not use the data here outside the pipeline itself.

Use case:

  • There are XML files placed in a folder "F1" that are generated and named by a system that connects to a FIX service of our local Stock Exchange. We cannot change anything from this process;
  • This is a very basic process and as the files are named by this system without any consideration to what they are, it so happens that a trade that might have been downloaded before gets downloaded again and given another file name, but the contents of the XML file are the same;
  • So we need a way to get such files and store some information about it somewhere we can lookup later to see whether we need to send this "new file" being fed to us or ignore it as it is a duplicate. The current "ETL" which is to be decommissioned by the solution being discussed here hashes the contents of each file, but that is overkill as there is at least one tag within each XML that would uniquely identify it (something like );
  • Ultimately, we need to also zip the collection of XML files that are not duplicates to be sent downstream to another system by placing this zip in a folder "F2";
  • Each day we start fresh with no need to lookup whether a trade today was reported yesterday, so we can wipe any storage solution clean at the end or beginning of each day.

Where I am at is that from all I have read I am not sure all of this can be done inside ADF, meaning all cases I have read so far points to me having to at least having to deploy an Azure Function or a Logic Apps instance to extract and/or compare the content of the XMLs that is relevant to the "duplicate or not" assessment, which is possible but undesirable (extra costs and red tape regarding approval of such solution, etc.), so it would be best if we can solve this without having to deploy any other solution other than ADF. I also I find it odd that ADF can't do it by itself.

As for the storage solution, as it is just about the process in itself and nothing will be re-used past each day or in any other process, I can't judge what would be best to use either (DB? Blob?, the FS itself?), but I'd stick with the FS if it does not create any issues.

I'm just getting started with Azure products, hence the overall confusion, but I'll get there.


Solution

  • If every file size of yours is less than 5000 rows, then you can try below pipeline design. Here, it requires two pipelines which are parent and child pipelines.

    (Parent pipeline)
    - Get meta data 1 - child items - calculate your pipeline run interval timings and give the last modified- It selects only the files which are modified after given date.
    - Get meta data activity2 - get all child items.
    
    - foreach - give Get meta data activity1 child items.
        - Filter activity - Filter out current filename item().name from Get meta 2 child items array.
        - Execute pipeline activity(Child pipeline) - pass current filename and filter output array as parameters to child pipeline.
            (Child pipeline)
            - lookup activity1 - lookup to passed filename and get XML content as JSON array.
            - Set variable activity - create a boolean variable Flag with value set to false.
            - for-each - pass filter output array.
                - lookup activity2 - for getting current filename content.
                - set variable activity of int type - get length of intersection of lookup1 and lookup2 array with expression like @length(intersection(lookup1 array, lookup2 array))
                - if activity - check this length greater than 1 or not
                    - True activities
                    - Set variable activity - Set the boolean variable Flag to true
            - if activity - check the Flag boolean true or not.
                - False activities
                - copy activity - copy the passed filename to temp folder
    - Copy activity - copy all files from temp folder to F2 folder as .zip file. Use Binary datsets for source and sink. Use wild card filepath in the source and `.zip(Deflate)` compression in the source dataset.
    - Delete activity - Delete all files in the `temp` folder using wild card file path. Use Binary dataset for this.
    - Delete activity - If you want to clean the old files in the F1 folder before next pipeline run, then delete all files in F1 folder as same as previous step.
    

    Here, you need to use dataset parameters for the datasets which are used inside the for-loops or which are used in the child pipeline. Check this SO answer to know about the usage of dataset parameters in ADF pipelines.

    If the size is more than the given rows, then it's better to use other services like functions or logic apps.