I am using a RichSinkFunction to execute a SQL UPDATE query on an existing record. This function assumes that a record already exists on the DB. However, in certain scenarios the existing record is late.
To overcome the issue of record lateness, I have added a Thread.sleep()
to make the function wait and retry the DB update.
Sample code provided below for reference.
class RichSinkFact extends RichSinkFunction[FulfillmentUsagesOutput]{
private def updateFactUpcoming(
r: FulfillmentUsagesOutput,
schemaName: String
): Unit = {
var updateStmt: PreparedStatement = null
val sqlStatement =
s"""
|UPDATE $schemaName.$factUpcomingTableName
|SET unit_id = ?
|WHERE pledge_id = ?
|;
|
""".stripMargin
try {
updateStmt = connection.prepareStatement(sqlStatement)
updateStmt.setLong(1, r.unit_id)
updateStmt.setString(2, r.pledge_id)
val rows = updateStmt.executeUpdate()
if(rows == 0) {
logger.warn(s"Retrying update for ${r}")
//retry update
Thread.sleep(retrySleepTime)
val rows = updateStmt.executeUpdate()
if(rows == 0){
//raise error
logger.error(s"Unable to update row: ${r}")
}
}
} finally {
if (updateStmt != null) {
updateStmt.close()
}
}
}
}
Question : Since Flink already implements other timers and uses internal time processing functions, is this the right way of retrying a DB update?
Thanks
Thanks to David for the original idea behind this approach. Sink.ProcessingTimeService is only present from Flink 1.12 onwards. So, for anyone on a previous version of Flink looking to implement a similar solution, ProcessingTimeCallback can be used to implement timers in a Sink application.
I have included a sample approach here https://gist.github.com/soumoks/f73694c64169c8b3494ba1842fa61f1b