Search code examples
javamultithreadingconcurrencyjava.util.concurrent

How can I implement this concurrent structure without the queue?


I have a situation in my application where events come in and the thread that handles them (signalling thread) must signal to another thread (working thread), thus far in an idle state, that it can run some code. Once the working thread is done it should wait to be signalled again. It is possible that events will arrive while the working thread is working. In this case it should move on and keep working immediately. One action by the working thread does enough work for any amount of incoming events, so there is no need to work once per event, just once as soon as possible after each event. Example correct behavior:

event comes in
worker thread starts work
worker thread finishes work
event comes in
worker thread starts work
event comes in
event comes in
worker thread finishes work
worker thread starts work
worker thread finishes work

4 events, 3 periods of work. It's an unfortunate but unavoidable requirement that the signalling thread cannot block while handling the event. I have implemented this at the moment using a BlockingQueue, which has the pointless side effect of filling itself up even though the contents are not interesting or even looked at. I was expecting to be able to make this work using CountDownLatch or CyclicBarrier or similar but I haven't been able to find a way. Here is my implementation:

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    private static final class MyBarrier {
        private BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
        void await() throws InterruptedException {
            queue.take();
            queue.clear();
        }
        void signal() {
            queue.add(true);
        }
    }

    private static Random random = new Random(0);

    private static void sleepForMax(int maxMillis) {
        sleep(random.nextInt(maxMillis));
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        MyBarrier myBarrier = new MyBarrier();
        final ExecutorService singallingThread = Executors.newSingleThreadExecutor();
        singallingThread.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                sleepForMax(1_000); // simulate period between events arriving
                myBarrier.signal();
                System.out.println("Signalling work to be done");
            }
            System.out.println("Thread interrupted");
        });
        final ExecutorService workingThread = Executors.newSingleThreadExecutor();
        workingThread.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    System.out.println("Waiting for work");
                    myBarrier.await();
                } catch (InterruptedException e) {
                    break;
                }
                System.out.println("Doing work...");
                sleepForMax(3_000); // simulate work being done
                System.out.println("Work done");
            }
            System.out.println("Thread interrupted");
        });
        sleep(10_000);
        singallingThread.shutdownNow();
        workingThread.shutdownNow();
    }

}

What's the better way to do this?


Solution

  • When I run your code with your implementation that uses Phaser, having changed the sleep times so that signalling occurs every 800 ms and processing takes 1000 ms, I get e.g. this output:

    00008: Waiting for work
    00808: Signalling work to be done
    00808: Doing work...                      <-- worker starts working
    01608: Signalling work to be done         <-- signal came, so there's more work
    01808: Work done
    01809: Waiting for work                   <-- waits for work...
    02409: Signalling work to be done         <-- ...for 600 ms, until the next signal
    02409: Doing work...
    

    (The number to the left is milliseconds since start. Also, you can reproduce it with your code with random delays, but that's harder to reproduce and see there.)

    If I understood it correctly, this is wrong. E.g. imagine what happens if signals stop coming.

    Your code can probably work with this adjustment for your specific case:

    private static final class MyBarrierWithPhaser {
    
        private final Phaser phaser = new Phaser(1);
        private int lastObservedPhase; // Phaser has initial phase 0
    
        void await() throws InterruptedException {
            // only works for 1 producer 1 worker; lastObservedPhase is kind of thread-local
            lastObservedPhase = phaser.awaitAdvanceInterruptibly(lastObservedPhase);
        }
    
        void signal() {
            phaser.arrive();
        }
    }
    

    With this, the worker records the last phase it advanced to, and if the signal thread "arrives" before the next awaitAdvanceInterruptibly, then the Phaser phase gets updated, and when worker tries to wait using a stale phase, it will progress immediately; if the signal thread does not arrive before awaitAdvanceInterruptibly, then the worker will wait until the signal thread finally arrives.

    Using simpler synchronization primitives, I can think of how to implement it using the synchronized-wait()-notify() mechanism:

    private static final class MyBarrierWithSynchronized {
    
        private boolean hasWork = false;
    
        synchronized void await() throws InterruptedException {
            while (!hasWork) {
                wait();
            }
            hasWork = false;
        }
    
        synchronized void signal() {
            hasWork = true;
            notifyAll(); // or notify() if we are sure there is 1 signal thread and 1 worker thread
        }
    }
    

    It has a couple of drawbacks: await() won't be interrupted if the thread is waiting to enter it. Also, some don't like synchronizing on this, I kept it so in order to be short. This can be rewritten using the java.util.concurrent.* analogues, this implementation will not have both of these drawbacks:

    private static final class MyBarrierWithLock {
    
        private boolean hasWorkFlag = false;
    
        private final Lock lock = new ReentrantLock();
        private final Condition hasWorkCond = lock.newCondition();
    
        void await() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                while (!hasWorkFlag) {
                    hasWorkCond.await();
                }
                hasWorkFlag = false;
            } finally {
                lock.unlock();
            }
        }
    
        void signal() {
            lock.lock();
            try {
                hasWorkFlag = true;
                hasWorkCond.signalAll(); // or signal() if we are sure there is 1 signal thread and 1 worker thread
            } finally {
                lock.unlock();
            }
        }
    }