Search code examples
google-bigquerygoogle-cloud-dataflowapache-beamgoogle-cloud-pubsubspotify-scio

Maintaining a global state within Apache Beam


We have a PubSub topic with events sinking into BigQuery (though particular DB is almost irrelevant here). Events can come with new unknown properties that eventually should end up as separate BigQuery columns.

So, basically I have two questions here:

  1. What is the right way for maintaining a global state within Pipeline (with set of encountered properties in my case)?
  2. What would be a good strategy for buffering/holding stream of events as soon as new property is encountered and until ALTER TABLE is executed

Right now I tried to use following (I'm using Spotify scio):

rows
  .withFixedWindows(Duration.millis(duration))
  .withWindow[IntervalWindow]
  .swap
  .groupByKey
  .map { case (window, rowsIterable) =>
    val newRows = findNewProperties(rowsIterable)
    mutateTableWith(newRows)
    rowsIterable
  }
  .flatMap(id)
  .saveAsBigQuery()

But this is terribly inefficient, as we at least need to load whole rowsIterable into memory and even traverse it.


Solution

  • We're building the very same project and we're following this approach with a refreshing side input containing the schemas (refreshed at intervals from BQ). So basically:

    1. On a side input load the schemas from BQ
    2. Stream data into BQ using streaming mode so that you can actually do something else with the rows that fail to insert (i.e: when they have a new, unknown property)
    3. Save those failed ones somewhere else (datastore?) to process them later (in another job, for example)
    4. That recovery job will issue schema changes, that will eventually be loaded by the main pipeline refreshing side input (step 1).

    I have an example of a job with that refreshing side input approach here