I have two tables: one table with data to process and a tracking table that keeps track of the data that's already been processed.
So for example, the tracking table below would indicate that we've already processed data from those two days.
date
---
3/15/23
3/16/23
If I were just running it directly in BigQuery or another database, I would do something like this (in pseudocode)
last_processed_date = max(date) from tracking table
date_to_process = last_processed_date + 1
select * from main table where date = date_to_process
insert into tracking table values(date_to_process)
The issue is that since this is a "script" and not a single expression, I don't think I can run it using the BigQueryIO connector using Dataflow. I'm wondering if there's something that I'm missing that would make this possible.
If it's possible, find a way to apply the needed logic for input fully in SQL
, with a solution that looks like :
select * from main table where date =
select increment_date(maxDate)
from
(select max(date) as maxDate from tracking table);
Then pass the inputs to your Dataflow
job.
If the solution 1 is not possibe, I think it's better to execute the first 2 queries outside of Dataflow
:
last_processed_date = max(date) from tracking table
date_to_process = last_processed_date + 1
Then pass date_to_process
as a pipeline option to the Dataflow
job.
In your job, your input connector would be a BigQueryIO.read
that executes the following query by the date argument :
select * from main table where date = date_to_process
The output connector would be a BigQuery.write
.
You can create a Shell
script using gcloud
cli and bq
to execute the first query and increment the value.
Then launch the Dataflow
job with the previous calculated date.
Example :
# Execute the first query
last_processed_date="bq query --quiet --use_legacy_sql=false 'SELECCT max(date) from tracking table' | awk '{if(NR>1)print}'"
date_to_process=$((++last_processed_date))
# Launch the Dataflow job, example with Python
python -m your_folder.main \
--runner=DataflowRunner \
--region=europe-west1 \
--date_to_process=$date_to_process \
....
The same as solution 2 but with Airflow
.
If you are using a pipeline orchestrator like Airflow
in your project, you can apply the same logic presented in the solution 2, but with Python code and operators
- A first operator BigQueryInsertJobOperator that executes query, get the result and pass it with xcom to a second operator
- The second operator BeamRunPythonPipelineOperator that launches the Dataflow job and passing the date as pipeline argument