Search code examples
jdbcgoogle-cloud-dataflowapache-beamspotify-scio

Streaming data from CloudSql into Dataflow


We are currently exploring how we can process a big amount of data store in a Google Cloud SQL database (MySQL) using Apache Beam/Google Dataflow.

The database stores about 200GB of data in a single table.

We successfully read rows from the database using JdbcIO, but so far this was only possible if we LIMIT the number of rows queried. Otherwise we will run into memory issue. I assume by default a SELECT query tries to load all resulting rows in-memory.

What is the idiomatic approach for this? Batching the SQL queries? Streaming the results?

We tried adjusting the fetch size of the statement executed, without much success.

This is what our JDBC read setup looks like:

JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  statementPreparator = statement => statement.setFetchSize(100),
  rowMapper = result => result.getString(1)
)

I haven't found any resources regarding stream from sql so far.

EDIT

I'm gonna list a view approaches I took, so others can learn something (for example how not to do it). To have a bit more context, the database table in question is really badly structured: It has a column containing a JSON string, and id column (primary key) plus a added and modified column (both TIMESTAMP types). At the time of the first approach it had no further indices. The table contains 25 mio rows. So this is probably more an database issue rather than a Apache Beam/JDBC issue. But nevertheless:

Approach 1 (above) - Query everything

Basically it looked like this:

val readOptions = JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  rowMapper = result => result.getString(1)
)

context
  .jdbcSelect(readOptions)
  .map(/*...*/)

This worked if I added a LIMIT to the query. But obviously was very slow.

Approach 2 - Keyset pagination

val queries = List(
  "SELECT data from raw_data LIMIT 5000 OFFSET 0",
  "SELECT data from raw_data LIMIT 5000 OFFSET 5000",
  "SELECT data from raw_data LIMIT 5000 OFFSET 10000"
  // ...
)

context
  .parallelize(queries)
  .map(query => {
      val connection = DriverManager.getConnection(/* */)
      val statement = connection.prepareStatement(query)
      val result = statement.executeQuery()

      makeIterable(result) // <-- creates a Iterator[String]
  })
  .flatten
  .map(/* processing */)

This worked somewhat better, though I quickly learned that a LIMIT _ OFFSET _ combination also starts scanning from the first row. So each subsequent query took longer, converging to way to long times.

Approach 2.5 - Keyset pagination with ordering

Like the above approach, but we created an index on the added column and updated the query to

SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x

This sped things up, but eventually the query time grew to long.

Approach 3 - No Beam/Dataflow

val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)

val rs = statement.executeQuery("SELECT data FROM raw_data")

while(rs.next()) {
  writer writeLine rs.getString(1)
}

This streams the resultset back row-by-row and writes the rows into files. It ran about 2 hours for all 25 mio records. Finally. It would be great if someone could point out how this solution can be achieved with Beam.

BTW: Now that I have the raw data as CSV files processing with Beam is a breeze. It's about 80GB of raw data which can be transformed to another CSV format in about 5 min with autoscaling etc.


Solution

  • I think JDBCIO doesn't scale very well due to its inherent limitations (single SELECT). I'm not aware of streaming support coming from MySQL and BEAM.

    You can probably dump your DB to something easier for data processing systems to process (e.g., csv). Does it work for you?