Search code examples
google-cloud-platformgoogle-cloud-sqlbqgoogle-cloud-data-transfer

Data Transfer from BQ To CLoud SQL


what is best way to transfer all records from BigQuery table to Cloud SQL table on daily basis (every day expected approximate count of records more than 255801312 [255 million]). I know we can create dataflow pipelines from BQ to CloudSQL, but this large amount of data will run for hours and hours. Any best solution to implement within google cloud?.


Solution

  • Here a working example of Workflow. You need to give enough permission to your workflow service account (cloudsql admin, bigquery dataviewer + job user, cloud storage admin) and the table must exist in your Cloud SQL instance (I tested with MySQL).

    The article is cooking with more detail in it. Replace the bucket, the projectid, the Cloud SQL instance name (mysql in my case), the query, the table name, the database schema

    main:
      steps:
        - assignStep:
            assign:
              - bucket: "TODO"
              - projectid: "TODO"
              - prefix: "workflow-import/export"
              - listResult:
                  nextPageToken: ""
        - export-query:
            call: googleapis.bigquery.v2.jobs.query
            args:
              projectId: ${projectid}
              body:
                query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS SELECT id, email FROM `copy_dataset.name_test`"}
                useLegacySql: false
        - importfiles:
            call: import_files
            args:
              pagetoken: ${listResult.nextPageToken}
              bucket: ${bucket}
              prefix: ${prefix}
              projectid: ${projectid}
            result: listResult
        - missing-files:
            switch:
              - condition:  ${"nextPageToken" in listResult}
                next: importfiles
    
    
    import_files:
      params:
        - pagetoken
        - bucket
        - prefix
        - projectid
      steps:
        - list-files:
            call: googleapis.storage.v1.objects.list
            args:
              bucket: ${bucket}
              pageToken: ${pagetoken}
              prefix: ${prefix}
            result: listResult
        - process-files:
            for:
              value: file
              in: ${listResult.items}
              steps:
                - wait-import:
                    call: load_file
                    args:
                      projectid: ${projectid}
                      importrequest:
                        importContext:
                          uri: ${"gs://" + bucket + "/" + file.name}
                          database: "test_schema"
                          fileType: CSV
                          csvImportOptions:
                            table: "workflowimport"
        - return-step:
            return: ${listResult}
    
    
    load_file:
      params: [importrequest,projectid]
      steps:
        - callImport:
            call: http.post
            args:
              url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/mysql/import"}
              auth:
                type: OAuth2
              body: ${importrequest}
            result: operation
        - chekoperation:
            switch:
              - condition: ${operation.body.status != "DONE"}
                next: wait
            next: completed
        - completed:
            return: "done"
        - wait:
            call: sys.sleep
            args:
              seconds: 5
            next: getoperation
        - getoperation:
            call: http.get
            args:
              url: ${operation.body.selfLink}
              auth:
                type: OAuth2
            result: operation
            next: chekoperation
    

    More detail in my medium article