I've been tasked to ingest flat files from data lake storage.
They are multiple files and will be stored in the same logical folder. The contents and structure of these files are different. each times a new file is added with the same structure of a previous one, the filename will be different. eg.
filename_1.csv
contents structure
col1, col2, col3
next time the same file is uploaded with different data, it can be called
january_new-data-1.csv
I've created the sink tables for each file.
how can I create adF pipelines to to ingest these files dynamically? Is it even possible?
I'm thinking these files need to be separated into their own logical folders first, yes?
Yes, we can ingest these files dynamically. These files no need to be separated into their own logical folders first. I created a simple test. Copied to different tables according to column count in the files through one pipeline.
I created two tables, one has 3 columns and the other one has 4 columns:
CREATE TABLE [dbo].[emp_stage](
[id] [int] NULL,
[name] [nvarchar](max) NULL,
[age] [nvarchar](max) NULL
)
CREATE TABLE [dbo].[Entities](
[id] [int] NULL,
[name] [varchar](25) NULL,
[age] [int] NULL,
[city] [varchar](100) NULL
)
In ADF, we can perform different copy activities by judging the number of different columns via Switch activity.
I have two csv files in my data lake "input" container. emp.csv
contains 3 columns, its corresponding table name is [dbo].[emp_stage]
. january_new-data-1.csv
contains 4 columns, its corresponding table name is [dbo].[Entities]
.
So I created a dataset to "input" container. Add dynamic content *.csv
and select First row as header
.
Use Get MetaData1
activity to get the Child Items.
Child Items is as follows:
Use Foreach1
activity, add dynamic content @activity('Get Metadata1').output.childItems
.
Inside Foreach1
activity, we can creat a dataset and key in dynamic content @item().name
. We can pass the file name dynamically to Get Metadata2
activity.
This way we can dynamically specify a file in the container.
Use Switch1
activity, add expression @string(activity('Get Metadata2').output.columnCount)
to convert the output of Get Metadata2
activity to string type. Get Metadata2
activity will get the column count of the specified file.
I will get two cases here, '3' and '4'. So by default is '3', we will go to Copy activity1
.
When case '4', we will go to Copy activity2
. Copy activity1
and Copy activity2
use the same data source. This data source we defined previously at Step 5. They sink to different tables.