Search code examples
google-cloud-platformgoogle-bigquerygoogle-cloud-dataflowgoogle-cloud-spanner

GCP: What is the best option to setup a periodic Data pipeline from Spanner to Big Query


Task: We have to setup a periodic sync of records from Spanner to Big Query. Our Spanner database has a relational table hierarchy.

Option Considered I was thinking of using Dataflow templates to setup this data pipeline.

  • Option1: Setup a job with Dataflow template 'Cloud Spanner to Cloud Storage Text' and then another with Dataflow template 'Cloud Storage Text to BigQuery'. Con: The first template works only on a single table and we have many tables to export.

  • Option2: Use 'Cloud Spanner to Cloud Storage Avro' template which exports the entire database. Con: I only need to export selected tables within a database and I don't see a template to import Avro into Big Query.

Questions: Please suggest what is the best option for setting up this pipeline


Solution

  • Use a single Dataflow pipeline to do it in one shot/pass. Here's an example I wrote using the Java SDK to help get you started. It reads from Spanner, transforms it to a BigQuery TableRow using a ParDo, and then writes to BigQuery at the end. Under the hood it's using GCS, but that's all abstracted away from you as a user.

    enter image description here

    package org.polleyg;
    
    import com.google.api.services.bigquery.model.TableFieldSchema;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.spanner.Struct;
    import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.values.PCollection;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
    import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE;
    
    /**
     * Do some randomness
     */
    public class TemplatePipeline {
        public static void main(String[] args) {
            PipelineOptionsFactory.register(DataflowPipelineOptions.class);
            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
            Pipeline pipeline = Pipeline.create(options);
            PCollection<Struct> records = pipeline.apply("read_from_spanner",
                    SpannerIO.read()
                            .withInstanceId("spanner-to-dataflow-to-bq")
                            .withDatabaseId("the-dude")
                            .withQuery("SELECT * FROM Singers"));
            records.apply("convert-2-bq-row", ParDo.of(new DoFn<Struct, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    TableRow row = new TableRow();
                    row.set("id", c.element().getLong("SingerId"));
                    row.set("first", c.element().getString("FirstName"));
                    row.set("last", c.element().getString("LastName"));
                    c.output(row);
                }
            })).apply("write-to-bq", BigQueryIO.writeTableRows()
                    .to(String.format("%s:spanner_to_bigquery.singers", options.getProject()))
                    .withCreateDisposition(CREATE_IF_NEEDED)
                    .withWriteDisposition(WRITE_TRUNCATE)
                    .withSchema(getTableSchema()));
            pipeline.run();
        }
    
        private static TableSchema getTableSchema() {
            List<TableFieldSchema> fields = new ArrayList<>();
            fields.add(new TableFieldSchema().setName("id").setType("INTEGER"));
            fields.add(new TableFieldSchema().setName("first").setType("STRING"));
            fields.add(new TableFieldSchema().setName("last").setType("STRING"));
            return new TableSchema().setFields(fields);
        }
    }
    

    Output logs:

    00:10:54,011 0    [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BatchLoads - Writing BigQuery temporary files to gs://spanner-dataflow-bq/tmp/BigQueryWriteTemp/beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12/ before loading them.
    00:10:59,332 5321 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter - Opening TableRowWriter to gs://spanner-dataflow-bq/tmp/BigQueryWriteTemp/beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12/c374d44a-a7db-407e-aaa4-fe6aa5f6a9ef.
    00:11:01,178 7167 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.WriteTables - Loading 1 files into {datasetId=spanner_to_bigquery, projectId=grey-sort-challenge, tableId=singers} using job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge}, attempt 0
    00:11:02,495 8484 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Started BigQuery job: {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge}.
    bq show -j --format=prettyjson --project_id=grey-sort-challenge beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0
    00:11:02,495 8484 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.WriteTables - Load job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge} started
    00:11:03,183 9172 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - Still waiting for BigQuery job beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, currently in status {"state":"RUNNING"}
    bq show -j --format=prettyjson --project_id=grey-sort-challenge beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0
    00:11:05,043 11032 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl - BigQuery job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge} completed in state DONE
    00:11:05,044 11033 [direct-runner-worker] INFO  org.apache.beam.sdk.io.gcp.bigquery.WriteTables - Load job {jobId=beam_load_templatepipelinegrahampolley0531141053eff9d0d4_3dd2ba3a1c0347cf860241ddcd310a12_b4b4722df4326c6f5a93d7824981dc73_00001_00000-0, location=australia-southeast1, projectId=grey-sort-challenge} succeeded. Statistics: {"completionRatio":1.0,"creationTime":"1559311861461","endTime":"1559311863323","load":{"badRecords":"0","inputFileBytes":"81","inputFiles":"1","outputBytes":"45","outputRows":"2"},"startTime":"1559311862043","totalSlotMs":"218","reservationUsage":[{"name":"default-pipeline","slotMs":"218"}]}
    

    enter image description here