I'm really new to dataform and going through the documentation, I haven't found a way to do what I'm trying to, namely to configure my incremental query in such a way that I can schedule it to run daily and only look at the absolute latest data in my raw data table.
Assume my raw_data
sits in BQ and that I what to do transformation of that data and load this into a new partitioned and clustered table, call it Transfomed
, but only doing this on the lastest uploaded rows of raw_data
.
I know How to do this by hard-coding yesterdays date:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
pre_operations {
declare event_timestamp_checkpoint default (
${when(incremental(),
`select max(ts) from ${self()}`,
`select timestamp("2021-10-06")`)}
)
}
SELECT distinct
timestamp as ts,
storeName,
DATE(timestamp) as Date,
itemId
.......
What I tried to be able to run it daily without having to manually do it was:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
pre_operations {
declare event_timestamp_checkpoint default (
${when(incremental(),
`select max(ts) from ${self()}`,
`select timestamp(DATE(max(ts)-1))}
)
}
thus replacing the hard-coded date 2021-10-06
by DATE(max(ts)-1)
. Obviously, this didn't work.
Any ideas on how to solve this?
For your purpose, I understand you want to add to your new table all the data all the new data from the source table. The clustering and partition does not affect how you will do that, the only difference is that you will have to declare you want this features in the new table.
The rules defined in the when()
clause will be compiled into the WHERE
filter of your SQL query, when the table is already created and the pipe line is adding data to the table.
You do not need to filter for the previous day timestamp, you just need to import all the data that is newer than your newest data on the destination table.
so your code show be something like this:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
select ts, storeName, itemId, itemName
from <dataset>.source_table
${when(incremental(), `where ts > (select max(ts) from ${self()})`)}
every time you run this pipeline the destination table will updated with the new data. you can transform the data as you want in the query.
Here is the details of my sandbox
:
source table:
Data on it:
Running the pipeline in dataform
:
If you add new data to the source table, only the new rows will be added to the destination table:
insert into `<bucket>.<dataset>.source_table` values ('2021-10-20 00:00:00 UTC','zabka',3,'woda3')
Run the pipeline again (or schedule it) and the new data is added:
Here you can find more details in the dataform
docs about how to create incremental tables.