I have a flink application that process stream of data and write some result to database. The stream is keyd by id. A database operation could take o quite of time (e.g 3 min) and can be only one operation for specified id key to protect against locks. At this moment, this sink operation could not be process with paralell and have to be set parallelism to 1.
process
.keyBy(new ProductKeySelector())
.addSink(new ProductSink())
.setParallelism(1)
I want to lock stream with actual processing id event and take another, out of order, and wait until same id end process then run process to it. It's will be process like blocking queue.
Update:
example:
kafkaKeyedStream
.map(new MapToProductType())
.keyBy(new ProductKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ProductAggregateFunction())
.addSink(new ProductSink());
From Kafka Source i recieved data:
As you can see, data are grouped by window function (first value in data is the key) and te results are process by sink function. For this example, let's say that processing will take 20s per each part of data. So if i have 1 thread its not a problem, because the next data waiting for process, but if i set parallelism= 2 then first part of data will be still process by one thread, and after 10s anoter thread start process next part of data with the same key as first. And this create a lock on database.
I would like in a situation where one thread is already processing data under a specific key, the second thread did not take data on the same key, and took either a different one or did nothing if there is nothing else
If your DB operation could take 3 minutes, you don't want to use a regular JDBC sink. Instead, look at Flink's Async IO support. You'd want to keyBy(id)
, and then inside of your custom RichAsyncFunction
operator you can keep track of whether you've got an active DB request for a given id.