Search code examples
apache-flinkflink-streamingflink-sql

Watermarks in a RichParallelSourceFunction


I am implementing a SourceFunction, which reads Data from a Database. The job should be able to be resumed if stopped or crushed (i.e savepoints and checkpoints) with the data being processed exactly once.

What I have so far:

@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends 
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{

    @transient var client: PostGreClient = _
    @volatile var isRunning: Boolean = true
    val DEFAULT_WAIT_TIME_MS = 1000

    def this(clientConfig: Serializable) =
        this(clientConfig, DEFAULT_WAIT_TIME_MS)

    override def stop(): Unit = {
        this.isRunning = false
    }

    override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        client = new JDBCClient
    }

    override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {

        while (isRunning){
           val statement = client.getConnection.createStatement()
           val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")

            while (resultSet.next()) {
                val event: String = resultSet.getString("name")
                val timestamp: Long = resultSet.getLong("timestamp")

                ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)

            }
        }
    }

    override def cancel(): Unit = {
        isRunning = false
    }
}

How can I make sure to only get the rows of the database which aren't processed yet? I assumed the ctx variable would have some information about the current watermark so that I could change my query to something like:

select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark

But it doesn't have any relevant methods for me. Any Ideas how to solve this problem would be appreciated


Solution

  • You have to implement CheckpointedFunction so that you can manage checkpointing by yourself. The documentation of the interface is pretty comprehensive but if you need an example I advise you to take a look at an example.

    In essence, your function must implement CheckpointedFunction#snapshotState to store the state you need using Flink's managed state and then, when performing a restore, it will read that same state in CheckpointedFunction#initializeState.