I am implementing a SourceFunction, which reads Data from a Database. The job should be able to be resumed if stopped or crushed (i.e savepoints and checkpoints) with the data being processed exactly once.
What I have so far:
@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
How can I make sure to only get the rows of the database which aren't processed yet?
I assumed the ctx
variable would have some information about the current watermark so that I could change my query to something like:
select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark
But it doesn't have any relevant methods for me. Any Ideas how to solve this problem would be appreciated
You have to implement CheckpointedFunction so that you can manage checkpointing by yourself. The documentation of the interface is pretty comprehensive but if you need an example I advise you to take a look at an example.
In essence, your function must implement CheckpointedFunction#snapshotState
to store the state you need using Flink's managed state and then, when performing a restore, it will read that same state in CheckpointedFunction#initializeState
.