I'm trying to wrap my head around the Producer-Consumer pattern in the context of tasks that can be indefinite but are also suspendable and can be resumed at later time. So far, alot of the examples around this pattern deals with Consumers pulling a singular, one-off tasks from some sort of queue that's filled by Producers.
But what if said tasks can be indefinite? For example, say the task at hand is to count the presence of certain works in a stream of strings. The stream can be infinitely long (or finite), and given more streams of strings that workers to process them, a mechanism maybe formed where workers alternate on processing streams - pausing their work on their current stream for another worker to continue before processing another stream.
I'm not sure how to properly design and implement an efficient mechanism around the Producer-Consumer pattern for this problem - or any other pattern/method.
So far, I've followed this guide: https://www.baeldung.com/java-producer-consumer-problem
But the issue is on Section 4.3, if instead of a Double, it was something like Queue<String>
that needed to be pushed back onto the BlockingQueue
, turning my Consumers to also be Producers.
But the issue is on Section 4.3, if instead of a Double, it was something like Queue that needed to be pushed back onto the BlockingQueue, turning my Consumers to also be Producers.
I can see your consumers (running in a standard ExecutorService
) be able to access a work-queue. They can dequeue a Queue
object, work on the strings for a while and then "pause" the job by putting that Queue
back on the work-queue and grabbing another job from the front of the work-queue to process. One question is whether or not there is state that needs to be kept as the Queue<String>
is being processed. In this case you will need to have a wrapping JobStatus
object which would hold the queue and any accumulators or other information so the next worker thread can pickup where the last one left off. This might be a good idea anyway so you don't get confused by the queue of queues.
workers alternate on processing streams - pausing their work on their current stream for another worker to continue before processing another stream
I think the trick here is how do the workers figure out whether they should pause their work or not? Maybe they could process 10000 items from the stream and put it back on the work-queue? That logic could be built into the JobStatus
object. You could look at the size()
of the work-queue to see whether things are backing up too. An alternative to this would be to have multiple thread-pools for different sized jobs. So the fast ones have their own queue for immediate servicing while the big ones chew in the background.
But given your specifications I can see pseudo code like the following. It pauses by putting the job on the end of the work-queue. Not sure that's what you would want.
// queue that when seen means that the jobs are done
final static Queue<String> END_MARKER = new LinkedList<>();
final BlockingQueue<Queue<String>> workQueue = new LinkedBlockingQueue<>();
...
ExecutorService threadPool = Executors.newCachedThreadPool();
...
// now start up our worker threads
for (int i = 0; i < NUM_THREADS; i++) {
threadPool.submit(new WorkerThread(workQueue));
}
// the threads will shutdown when the END_MARKER is seen
threadPool.shutdown();
...
// now add the jobs to the work-queue
for (Queue<String> queue : queuesForProcessing) {
workQueue.add(queue);
}
// at the end of the work you add END_MARKER to the work-queue
workQueue.add(END_MARKER);
public class WorkerThread {
private final BlockingQueue<Queue<String>> workQueue; // from constructor
public void run() {
while (true) {
Queue<String> job = workQueue.take();
if (job == END_MARKER) {
// need to tell the others to quit as well
workQueue.put(job);
break;
}
// now work on the queue job
while (true) {
String foo = job.poll();
if (foo == null) {
// we are done with this job, get another one
break;
}
// do the work on the String
...
// now that we are done, see if we should pause this job
if (testToSeeIfWeShouldPauseJob()) {
// put it back on the queue and then loop to get another job
workQueue.put(job);
break;
}
}
}
}
}
Hopefully this helps.