Search code examples
google-cloud-platformgoogle-bigqueryterraform

How to check whether a BigQuery table is not populated and publish a failure message to the Slack channel?


I need to create a solution (using Terraform) that checks the BigQuery table is not empty on a daily basis, and if it's empty, it should publish an alert in alerts Slack channel. Problem to implement it is that scheduled query configuration has only 2 options:

  1. publish a message to email
  2. publish a message to the Pub/Sub topic How can I implement it using scheduled query and maybe additional configuration ?

I'm using the following query to check if the table is not populated.
ASSERT NOT EXISTS(SELECT COUNT(*) FROM ${var.project_id}.${local.dataset_name}.${local.table_name} where DATE(UPDATED_AT) < DATE(CURRENT_TIMESTAMP())) AS "Table has no records"


Solution

  • After some investigation of GCP documentation, I have found a solution. So I've found the following flow.

    Scheduled queries on a daily basis check the BigQuery table using the above-mentioned query. In case the table is empty, a scheduled query publishes a message to GCP Pub/Sub. GCP Cloud Function written using Python listens to the Pub/Sub cloud event, and if there is a new message in Pub/Sub, it consumes a message and publishes it to the corresponding Slack channel.

    What should be done:

    1. Create a Pub/Sub topic using Terraform
    2. Create a Scheduled query using Terraform
    3. Create a Cloud Function using Python
    4. Create a Deployment for Cloud Function using GitHub Actions

    Pub/Sub topic using Terraform

    resource "google_pubsub_topic" "topic-name" {
          
     name    = "pubsub-topic-name"
          
     project = var.project_id
        
    }
    

    Сreate a Scheduled query using Terraform

    resource "google_bigquery_data_transfer_config" "scheduled_query_name {
                  
     depends_on = [module.bigquery_dataset]
                
                  
     display_name         = "Check if BigQuery table is not populated"
                  
     location             = var.euw4_region
                  
     service_account_name = module.service_account_name.email
                  
     data_source_id       = "scheduled_query"
                  
     schedule             = "every day 08:00"
                  
     params = {
                    
     query = "ASSERT NOT EXISTS(SELECT COUNT(*) FROM `${var.project_id}.${local.dataset_name}.${local.table_name}` where DATE(UPDATED_AT) < DATE(CURRENT_TIMESTAMP())) AS \"Table has no records\""
                }
     notification_pubsub_topic = "projects/${var.project_id}/topics/topic_name"
                  project                   = var.project_id
                
     }
    

    Create a Cloud Function using Python

    import base64
    
    import functions_framework
    
    import os from slack_sdk 
    import WebClient
    
    
    
    slack_token = os.environ["SLACK_TOKEN"]
    
    client = WebClient(token=slack_token)
    
    
    
    @functions_framework.cloud_event
    
    def process_pubsub_message(event):
        
        data: Dict = event.data
               
        slack_text = base64.b64decode(data["message"]["data"]).decode('utf-8')   
        slack_channel_id="your_channel_id"
        
        client.chat_postMessage(channel=slack_channel_id, text=slack_text)
    

    Create a Deployment for Cloud Function using GitHub Actions

    gcloud functions deploy "${{ env.CLOUD_FUNCTION_NAME }}" \
        
    --region "${{ env.REGION }}" \
        
    --entry-point process_pubsub_message \
        
    --gen2 \
        
    --runtime python312 \
        
    --timeout 60s \
        
    --project ${{ vars.GCP_PROJECT_ID }} \
        
    --service-account your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
        
    --build-service-account projects/${{ vars.GCP_PROJECT_ID }}/serviceAccounts/your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
        
    --run-service-account your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
        
    --source ./CloudFunction \
        
    --trigger-topic topic-name \
    --set-env-vars SLACK_TOKEN=${{ secrets.SLACK_TOKEN }}
    

    Also not to forget to define SLACK_TOKEN in secrets