Search code examples
scalagoogle-bigqueryapache-beamspotify-scio

Apache Beam Saving to BigQuery using Scio and explicitly specifying TriggeringFrequency


I'm using Spotify Scio to create a scala Dataflow pipeline which is triggered by a Pub/Sub message. It reads from our private DB and then inserts information into BigQuery.

The problem is:

  • I need to delete the previous data
  • For this, I need to use write disposition WRITE_TRUNCATE
  • But, the job is automatically registered as streaming and thus I get the following error: WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
  • So I need to manually change the pipeline to be a Batch pipeline, specifying a triggering frequency.

So until now I had the following pipeline:

sc
  .customInput("Job Trigger", inputIO)
  .map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
  .flatten
  .withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
  .groupBy(_.ssoId)
  .map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
  .filter(_.isSuccess)
  .map(_.get)
  .saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)

I can't seem to find a way to specify a Trigger Frequency when I use the scio api (saveAsBigQuery).

It's only present in the native beam api:

BigQueryIO
  .write()
  .withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
  .to(bqTableName)
  .withSchema(getSchema)
  .withCreateDisposition(CREATE_NEVER)
  .withWriteDisposition(WRITE_TRUNCATE)

If I use the BigQueryIO I'll have to use sc.pipeline.apply instead of my current pipeline.

Is there a way to somehow integrate the BigQueryIO to my current pipeline or somehow specify withTriggeringFrequency on the current pipeline?


Solution

  • Scio currently doesn't support specifying the method to be used for loading data to Big Query. Since this is not possible, automatically STREAMING_INSERTS is used for unbounded collections, which obviously can't support truncation. Therefore, you need to fallback to Beam's BigQueryIO specifying a triggering frequency (withTriggeringFrequency(...)) and method (withMethod(Method.FILE_LOADS)).

    To integrate it in your Scio pipeline, you can simply use saveAsCustomOutput. An example can also be found here: https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library