I'm using Spotify Scio
to create a scala Dataflow pipeline which is triggered by a Pub/Sub
message. It reads from our private DB
and then inserts information into BigQuery
.
The problem is:
WRITE_TRUNCATE
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
Batch
pipeline, specifying a triggering frequency.So until now I had the following pipeline:
sc
.customInput("Job Trigger", inputIO)
.map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
.flatten
.withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
.groupBy(_.ssoId)
.map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
.filter(_.isSuccess)
.map(_.get)
.saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)
I can't seem to find a way to specify a Trigger Frequency when I use the scio
api (saveAsBigQuery
).
It's only present in the native beam
api:
BigQueryIO
.write()
.withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
.to(bqTableName)
.withSchema(getSchema)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_TRUNCATE)
If I use the BigQueryIO
I'll have to use sc.pipeline.apply
instead of my current pipeline.
Is there a way to somehow integrate the BigQueryIO
to my current pipeline or somehow specify withTriggeringFrequency
on the current pipeline?
Scio currently doesn't support specifying the method to be used for loading data to Big Query. Since this is not possible, automatically STREAMING_INSERTS
is used for unbounded collections, which obviously can't support truncation. Therefore, you need to fallback to Beam's BigQueryIO
specifying a triggering frequency (withTriggeringFrequency(...)
) and method (withMethod(Method.FILE_LOADS)
).
To integrate it in your Scio pipeline, you can simply use saveAsCustomOutput
.
An example can also be found here: https://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library