Search code examples
google-cloud-platformgoogle-bigquerygoogle-workflows

Google Workflow insert a bigquery job that queries a federated Google Drive table


I am working on an ELT using workflows. So far very good. However, one of my tables is based on a Google sheet and that job fails on "Access Denied: BigQuery BigQuery: Permission denied while getting Drive credentials."

I know I need to add the https://www.googleapis.com/auth/drive scope to the request and the service account that is used by the workflow needs access to the sheet. The access is correct and if I do an authenticated insert using curl it works fine.

My logic is that I should add the drive scope. However I do not know where/how to add it. Am I missing something?

The step in the Workflow:

        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          body:
            configuration:
              query:
                query: select * from `*****.domains_sheet_view`
                destinationTable:
                  projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                  datasetId: ***
                  tableId: domains
                create_disposition: CREATE_IF_NEEDED
                write_disposition: WRITE_TRUNCATE
                allowLargeResults: true
                useLegacySql: false```

Solution

  • AFAIK for connectors, you cannot customize the scope parameter but you can customize if you put together the HTTP call yourself.

    1. add the service account as a viewer on the Google Docs
    2. then run the workflow

    here is my program

    #workflow entrypoint
    main:
      steps:
        - initialize:
            assign:
              - project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
        - makeBQJob:
            call: BQJobsInsertJobWithSheets
            args:
              project: ${project}
              configuration:
                  query:
                    query: SELECT * FROM `ndc.autoritati_publice` LIMIT 10
                    destinationTable:
                      projectId: ${project}
                      datasetId: ndc
                      tableId: autoritati_destination
                    create_disposition: CREATE_IF_NEEDED
                    write_disposition: WRITE_TRUNCATE
                    allowLargeResults: true
                    useLegacySql: false
            result: res
        - final:
            return: ${res}
    #subworkflow definitions
    BQJobsInsertJobWithSheets:
      params: [project, configuration]
      steps:
        - runJob:
            try:
              call: http.post
              args:
                url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/"+project+"/jobs"}
                headers:
                  Content-type: "application/json"
                auth:
                  type: OAuth2
                  scope: ["https://www.googleapis.com/auth/drive","https://www.googleapis.com/auth/cloud-platform","https://www.googleapis.com/auth/bigquery"]
                body:
                  configuration: ${configuration}
              result: queryResult
            except:
              as: e
              steps:
                - UnhandledException:
                    raise: ${e}
            next: queryCompleted
        - pageNotFound:
            return: "Page not found."
        - authError:
            return: "Authentication error."
        - queryCompleted:
            return: ${queryResult.body}