I have the producer code that generates the random character:
public class Producer implements Runnable {
@Override
public void run() {
Stream<Character> generate = Stream.generate(this::generateRandomCharacter).limit(15);
generate.forEach(character -> {
MyEvent myEvent = new MyEvent();
myEvent.setMesage(character + "");
LOG.info("Producer: " + name + " is waiting to transfer...");
try {
boolean added = transferQueue.tryTransfer(myEvent, 4000, TimeUnit.MILLISECONDS);
if (added) {
numberOfProducedMessages.incrementAndGet();
LOG.info("Producer: " + name + " transferred element: A");
} else {
LOG.info("can not add an element due to the timeout");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
The consumer code is provided:
public class Consumer implements Runnable {
private static final Logger LOG = Logger.getLogger(Consumer.class.getName());
private final TransferQueue<MyEvent> transferQueue;
private final String name;
final int numberOfMessagesToConsume;
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
Consumer(TransferQueue<MyEvent> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
while (true){
try {
LOG.info("Consumer: " + name + " is waiting to take element...");
MyEvent element = transferQueue.take();
longProcessing(element);
System.out.println("Consumer: " + name + " received element with messgae : " + element.getMesage());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(MyEvent element) throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(5);
}
}
This is the call for the consumer/ produce:
TransferQueue<Event> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer( transferQueue, "1", 2);
Consumer consumer = new Consumer(transferQueue, "1", 2);
exService.execute(producer);
exService.execute(consumer);
boolean isShutDown = exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
if (!isShutDown) {
exService.shutdown();
}
The producer will create only a limited number of characters that the consumer will consume. How do I know if the producer is finished the character generation?
I think about implementing a timeout to know if the producer is not sending any more characters, but there might be a better option for this implementation.
There are various alternative ways to achieve this:
Use a special type of event to show that the producer has finished. (This is basically what the answer by Krzysztof Cichocki suggests). Pros: simplicity. Cons: you have to make sure that whatever special event you choose to signify "finished" cannot possibly be a real event emitted by the producer.
Use a count. It looks like this is what your code is already trying to do. For example, pass 15 in the numberOfMessagesToConsume
argument to the consumer constructor, and the Run()
method then stops once it has consumed 15 messages. Pros: simplicity. Cons: inflexibility, and you might not know how many messages the producer will produce beforehand.
Monitor the state of the producer thread. For example, the consumer can check while (producerThread.isAlive()) {...}
. The producer thread will terminate when it has finished producing the messages. Pros: flexibility. Cons: you don't want the consumer to know about the producer thread, as that's too much coupling. For example, you might start the producer using new Thread(...)
or you might use an ExecutorService
or a CompletableFuture
. The consumer shouldn't need to know.
One way of mitigating around the disadvantage of option 3 is to pass a function to the consumer to decouple the testing of producer state from the threading details:
Constructor:
Consumer(TransferQueue<MyEvent> transferQueue, String name, BooleanSupplier isProducerStillProducing)
Call the constructor with a lambda:
new Consumer(transferQueue, name, () -> producerThread.isAlive())
Test it in the run()
method:
while (isProducerStillProducing.getAsBoolean()) { ... }