Search code examples
sqlscalaprepared-statementapache-flinkflink-streaming

Retry SQL UPDATE query after waiting x seconds


I am using a RichSinkFunction to execute a SQL UPDATE query on an existing record. This function assumes that a record already exists on the DB. However, in certain scenarios the existing record is late.

To overcome the issue of record lateness, I have added a Thread.sleep() to make the function wait and retry the DB update.

Sample code provided below for reference.

class RichSinkFact extends RichSinkFunction[FulfillmentUsagesOutput]{

private def updateFactUpcoming(
    r: FulfillmentUsagesOutput,
    schemaName: String
  ): Unit = {

    var updateStmt: PreparedStatement = null
    val sqlStatement =
      s"""
         |UPDATE $schemaName.$factUpcomingTableName
         |SET unit_id = ?
         |WHERE pledge_id = ?
         |;
         |
      """.stripMargin

    try {
      updateStmt = connection.prepareStatement(sqlStatement)
      updateStmt.setLong(1, r.unit_id)
      updateStmt.setString(2, r.pledge_id)
      val rows = updateStmt.executeUpdate()

      if(rows == 0) {
        logger.warn(s"Retrying update for ${r}")
        //retry update
        Thread.sleep(retrySleepTime)
        val rows = updateStmt.executeUpdate()
        if(rows == 0){
          //raise error
          logger.error(s"Unable to update row: ${r}")
        }
      }

    } finally {
      if (updateStmt != null) {
        updateStmt.close()
      }
    }
  }
}

Question : Since Flink already implements other timers and uses internal time processing functions, is this the right way of retrying a DB update?

Thanks


Solution

  • Thanks to David for the original idea behind this approach. Sink.ProcessingTimeService is only present from Flink 1.12 onwards. So, for anyone on a previous version of Flink looking to implement a similar solution, ProcessingTimeCallback can be used to implement timers in a Sink application.

    I have included a sample approach here https://gist.github.com/soumoks/f73694c64169c8b3494ba1842fa61f1b