Search code examples
google-cloud-platformgoogle-bigquerygoogle-cloud-storagecloudgoogle-cloud-pubsub

BigQuery Table Exports


I am looking for the best pattern to be able to execute and export a BigQuery query result to a cloud storage bucket. I would like this to be executed when the BigQuery table is written to or modified.

I think I would traditionally setup a pubsub topic that would be written to when the table is modified, which would trigger a GCP function that is responsible for executing the query and writing the result to a GCP bucket. I just am not too confident that there isn't a better approach (more straight forward) to do this in GCP.

Thanks in advance.


Solution

  • I propose you an approach based on Eventarc.

    The goal is to launch a Cloud Function or Cloud Run action when the data is inserted or updated in a BigQuery table, example with Cloud Run :

    SERVICE=bq-cloud-run
    PROJECT=$(gcloud config get-value project)
    CONTAINER="gcr.io/${PROJECT}/${SERVICE}"
    gcloud builds submit --tag ${CONTAINER}
    gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed
    
    gcloud eventarc triggers create ${SERVICE}-trigger \
      --location ${REGION} --service-account ${SVC_ACCOUNT} \
      --destination-run-service ${SERVICE}  \
      --event-filters type=google.cloud.audit.log.v1.written \
      --event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
      --event-filters serviceName=bigquery.googleapis.com
    

    When a BigQuery job was executed, the Cloud Run action will be triggered.

    Example of Cloud Run action :

    @app.route('/', methods=['POST'])
    def index():
        # Gets the Payload data from the Audit Log
        content = request.json
        try:
            ds = content['resource']['labels']['dataset_id']
            proj = content['resource']['labels']['project_id']
            tbl = content['protoPayload']['resourceName']
            rows = int(content['protoPayload']['metadata']
                       ['tableDataChange']['insertedRowsCount'])
            if ds == 'cloud_run_tmp' and \
               tbl.endswith('tables/cloud_run_trigger') and rows > 0:
                query = create_agg()
                return "table created", 200
        except:
            # if these fields are not in the JSON, ignore
            pass
        return "ok", 200
    

    You can apply logic based on the current dataset, table or other elements existing in the current payload.