I need to create a solution (using Terraform) that checks the BigQuery table is not empty on a daily basis, and if it's empty, it should publish an alert in alerts Slack channel. Problem to implement it is that scheduled query configuration has only 2 options:
I'm using the following query to check if the table is not populated.
ASSERT NOT EXISTS(SELECT COUNT(*) FROM ${var.project_id}.${local.dataset_name}.${local.table_name} where DATE(UPDATED_AT) < DATE(CURRENT_TIMESTAMP())) AS "Table has no records"
After some investigation of GCP documentation, I have found a solution. So I've found the following flow.
Scheduled queries on a daily basis check the BigQuery table using the above-mentioned query. In case the table is empty, a scheduled query publishes a message to GCP Pub/Sub. GCP Cloud Function written using Python listens to the Pub/Sub cloud event, and if there is a new message in Pub/Sub, it consumes a message and publishes it to the corresponding Slack channel.
What should be done:
Pub/Sub topic using Terraform
resource "google_pubsub_topic" "topic-name" {
name = "pubsub-topic-name"
project = var.project_id
}
Сreate a Scheduled query using Terraform
resource "google_bigquery_data_transfer_config" "scheduled_query_name {
depends_on = [module.bigquery_dataset]
display_name = "Check if BigQuery table is not populated"
location = var.euw4_region
service_account_name = module.service_account_name.email
data_source_id = "scheduled_query"
schedule = "every day 08:00"
params = {
query = "ASSERT NOT EXISTS(SELECT COUNT(*) FROM `${var.project_id}.${local.dataset_name}.${local.table_name}` where DATE(UPDATED_AT) < DATE(CURRENT_TIMESTAMP())) AS \"Table has no records\""
}
notification_pubsub_topic = "projects/${var.project_id}/topics/topic_name"
project = var.project_id
}
Create a Cloud Function using Python
import base64
import functions_framework
import os from slack_sdk
import WebClient
slack_token = os.environ["SLACK_TOKEN"]
client = WebClient(token=slack_token)
@functions_framework.cloud_event
def process_pubsub_message(event):
data: Dict = event.data
slack_text = base64.b64decode(data["message"]["data"]).decode('utf-8')
slack_channel_id="your_channel_id"
client.chat_postMessage(channel=slack_channel_id, text=slack_text)
Create a Deployment for Cloud Function using GitHub Actions
gcloud functions deploy "${{ env.CLOUD_FUNCTION_NAME }}" \
--region "${{ env.REGION }}" \
--entry-point process_pubsub_message \
--gen2 \
--runtime python312 \
--timeout 60s \
--project ${{ vars.GCP_PROJECT_ID }} \
--service-account your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
--build-service-account projects/${{ vars.GCP_PROJECT_ID }}/serviceAccounts/your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
--run-service-account your-service-account-sa@${{ vars.GCP_PROJECT_ID }}.iam.gserviceaccount.com \
--source ./CloudFunction \
--trigger-topic topic-name \
--set-env-vars SLACK_TOKEN=${{ secrets.SLACK_TOKEN }}
Also not to forget to define SLACK_TOKEN in secrets