Search code examples
azure-data-factorygoogle-cloud-dataflow

How to fetch distinct dates from a CSV file and iterate a query for deletion on Azure DataFactory Pipeline


I want to import paycheck data from a csv file which is located at Azure BLOB Storage into Azure Database on ADF. Since paycheck schedules are fixed if batch of PaycheckDate was already on a table, existing data of a given PaycheckDate should be cleaned up before importing and (new) entries in the file are expected to be imported for a given PaycheckDate. (There is no unique identifiers on a file to pick up a particular entry)

For a file, there is a CSV file, for example, Paycheck_202402.csv on PaycheckContainer of BLOB Storage, and here is a schema and data. (About 28K rows in a file)

CompanyId,EmployeeId,PaycheckDate,Amount,Description
100,e1234,2024-02-09,150.5,Overtime
100,e1234,2024-02-09,1500.0,Salary
100,e1234,2024-02-23,305.25,Overtime
100,e1234,2024-02-23,1500.0,Salary
200,e2222,2024-02-09,50.5,Tip
200,e2222,2024-02-09,500.0,Salary
400,e5555,2024-02-15,1000,Compensate
400,e5555,2024-02-23,1500.0,Salary

For target a table called as factPaychecks, here is a simple schema

CompanyId INT,
EmployeeId VARCHAR,
PaycheckDate DATE,
Amount DOUBLE(10,2),
Description VARCHAR

What I'd like to do is

  • to extract unique PaycheckDates from a file into an array, so the expected result is arrCheckDates = [2024-02-09, 2024-02-15, 2024-02-23].
  • to pass arrCheckDates to For-Each activity,
  • and then iterate a DELETE query against factPaychecks by a given PaycheckDate
DELETE FROM `factPaychecks` WHERE [PaycheckDate] = $PaycheckDate
  • to import entries in a file to factPaychecks

I tried

  1. DataFlow(DFImportPCK) for extracting PaycheckDates and sink to Cache

1.1. ((Source)) from CSV

1.2. ((Aggregate)) [Group by] PaycheckDate with [Aggregates] NumOfRows(PaycheckDate)

1.3. ((Select)) PaycheckDate only - checked the result is DATE type

1.4. ((Sink))`` [Type] as Cache`, [Options] checked Write to activity output, [Key column] List of columns (no columns are assigned) --> Inspect/Data Preview : DATE type https://i.sstatic.net/v7Atu.png

  1. Assign the returned value to an array type variable varArrPaycheckDates on a pipeline(PLImportPCK)
varArrPaycheckDates = @activity('DFImportPCK').output.runStatus.output.Sink2Cache.value

There are couple of issues I have

  1. In DataFlow(DFImportPCK), I could see only 1 entry 2024-02-09 at ((Aggregate))
`PaycheckDate` 2024-02-09 `NumOfRows` 1000

It's same as subsequent steps even ((Sink)) I could see 2024-02-09 only. But it must be 3 elements like above [2024-02-09, 2024-02-15, 2024-02-23] 2. DataFlow(DFImportPCK) returns a (LONG)INTEGER value 1688688000000 not 2024-02-09 DATE type.

Could you guide me how I can achieve my goal or point me out what I'm missing?

Thanks!


Solution

  • DataFlow(DFImportPCK) returns a (LONG)INTEGER value 1688688000000 not 2024-02-09 DATE type.

    The issue is with the date type First you need to cast it as string type as below:

    enter image description here

    Then in DataFlow setting set logging level none and uncheck First Row Only.

    enter image description here

    then in dataflow Output you can see array of PaycheckDate.

    enter image description here

    then Add for each loop to iterate on this array. with expreassion

    @activity('Data flow1').output.runStatus.output.Sink1.value
    

    enter image description here

    Under foreach activity take append variable with expression @item().PaycheckDate to store all values in single array.

    enter image description here

    After execution of for each append variable will look like this :

    enter image description here