I have a producer that reads blocks of text from the disk. Multiple consumers are doing computations on that blocks.
I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over.
Have put it in pseudocode to illustrate what I would like to achieve.
// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
// (!) if activeCounter exceeds a THRESHOLD, then pause
executorService.submit(() -> {
activeCounter.incrementAndGet();
// do some work
activeCounter.decrementAndGet();
});
});
"I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over."
The real task descrition is slightly different: the producer, before reading data from the disk, should aquire a permit to do so.
If your producer is a thread, then natural facility to manage permits is Semaphore. Initialy it contains n permits. The producer, to read a block, aguires 1 permit with Semaphore::aquire
. When the block is processed by the consumer, consumer releases 1 permit with Semaphore::release
.
Another approach is to combine blocks and permits. Similary to the output queue from producer to consumer, create an input blocking queue for blocks. Initualy put there n blocks. The producer, to read a block, first takes the next block from that queue. Consumer, after handling a block, returns it to the input queue.