Search code examples
google-bigquerygoogle-cloud-dataflowapache-beam

Is it possible to execute scripts on BigQuery in Dataflow?


I have two tables: one table with data to process and a tracking table that keeps track of the data that's already been processed.

So for example, the tracking table below would indicate that we've already processed data from those two days.

date
---
3/15/23
3/16/23

If I were just running it directly in BigQuery or another database, I would do something like this (in pseudocode)

last_processed_date = max(date) from tracking table
date_to_process = last_processed_date + 1

select * from main table where date = date_to_process

insert into tracking table values(date_to_process)

The issue is that since this is a "script" and not a single expression, I don't think I can run it using the BigQueryIO connector using Dataflow. I'm wondering if there's something that I'm missing that would make this possible.


Solution

  • Solution 1 :

    If it's possible, find a way to apply the needed logic for input fully in SQL, with a solution that looks like :

    select * from main table where date = 
      select increment_date(maxDate) 
      from 
         (select max(date) as maxDate from tracking table);
    

    Then pass the inputs to your Dataflow job.

    Solution 2 :

    If the solution 1 is not possibe, I think it's better to execute the first 2 queries outside of Dataflow :

    last_processed_date = max(date) from tracking table
    date_to_process = last_processed_date + 1
    

    Then pass date_to_process as a pipeline option to the Dataflow job.

    In your job, your input connector would be a BigQueryIO.read that executes the following query by the date argument :

    select * from main table where date = date_to_process
    

    The output connector would be a BigQuery.write.

    You can create a Shell script using gcloud cli and bq to execute the first query and increment the value.

    Then launch the Dataflow job with the previous calculated date.

    Example :

    # Execute the first query
    last_processed_date="bq query --quiet --use_legacy_sql=false 'SELECCT max(date) from tracking table' | awk '{if(NR>1)print}'"
    
    date_to_process=$((++last_processed_date))
    
    # Launch the Dataflow job, example with Python
    python -m your_folder.main \
            --runner=DataflowRunner \
            --region=europe-west1 \
            --date_to_process=$date_to_process \
           ....
    

    Solution 3 :

    The same as solution 2 but with Airflow.

    If you are using a pipeline orchestrator like Airflow in your project, you can apply the same logic presented in the solution 2, but with Python code and operators

    - A first operator BigQueryInsertJobOperator that executes query, get the result and pass it with xcom to a second operator
    - The second operator BeamRunPythonPipelineOperator that launches the Dataflow job and passing the date as pipeline argument