public class SemActionPlace {
public SemMonitor StartConsumerProducer() {
SemMonitor monitor = new SemMonitor();
List<Thread> threads = new LinkedList<>();
Thread p1 = new Thread(new Producer(monitor), "P1");
p1.start();
Thread c1 = new Thread(new Consumer(monitor), "C-odd");
c1.start();
Thread c2 = new Thread(new Consumer(monitor), "C-even");
c2.start();
threads.add(p1);
threads.add(c1);
threads.add(c2);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return monitor;
}
}
the code work just fine when I start thread through start() - join(), however, I failed to find mistake when I try to do the same through the executor service. It's important for me to save names of the threads and mutual monitor. Please, tell me how can I execute the threads through the executor service ?
The piece of code below doen't work properly. Where is mistake ?
public SemMonitor StartConsumerProducer() {
SemMonitor monitor = new SemMonitor();
Thread p1 = new Thread(new Producer(monitor), "P1");
Thread c1 = new Thread(new Consumer(monitor), "C-odd");
Thread c2 = new Thread(new Consumer(monitor), "C-even");
ThreadPoolExecutor service = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
service.execute(p1);
service.execute(c1);
service.execute(c2);
System.out.println(service.getCompletedTaskCount());
System.out.println(service.getCompletedTaskCount());
return monitor;
}
I need one simple thing from the executor server is that I wanna that it works like simple start() - join() solution works ( first piece of code ) .
class Consumer implements Runnable {
private final SemMonitor monitor;
Consumer(SemMonitor monitor) {
this.monitor = monitor;
}
@Override
public void run() {
long t = System.currentTimeMillis();
long end = t + 1000;
while (System.currentTimeMillis() < end) {
consoleLog(monitor.activeThreadName,false);
if (/*monitor.semaphore.tryAcquire() && */monitor.activeThreadName.equals( Thread.currentThread().getName())) {
try {
consoleLog(String.valueOf(Thread.currentThread().getName() + " was notified "),monitor.enableLog);
monitor.semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
monitor.get(Thread.currentThread().getName());
}
try{
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer implements Runnable {
private SemMonitor monitor;
Producer(SemMonitor monitor) {
this.monitor = monitor;
}
@Override
public void run() {
String threadNameToWork;
Integer randNum;
long t = System.currentTimeMillis();
long end = t + 500;
while (System.currentTimeMillis() < end) {
if (monitor.semaphore.tryAcquire()) {
randNum = ((Number) (random() * 100)).intValue();
if (randNum % 2 == 0) {
threadNameToWork = "C-even";
} else {
threadNameToWork = "C-odd";
}
try {
monitor.putItem(randNum, Thread.currentThread().getName(), threadNameToWork);
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Monitor {
private double currItem;
private boolean isConsumersShouldWaitProducer = true;
private boolean isConsuming = false;
private String threadNameToWork;
synchronized void putRandNumber(double producerOutput, String producerName, String threadNameToWork) {
if (isConsumersShouldWaitProducer) {
System.out.println("Consumers wait for new Production");
}
this.threadNameToWork = threadNameToWork;
currItem = producerOutput;
System.out.println("Producer " + producerName + " putRandNumber Item: " + currItem);
if (currItem > 3) {
notifyAll();
isConsumersShouldWaitProducer = false;
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
synchronized void consumeRandNumber(String threadName) {
if (isConsumersShouldWaitProducer) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println("Caught Interrupted Exception while waiting to consume currItem: " + e.getMessage());
}
}
if (isConsuming) {
try {
this.wait();
isConsuming = true;
} catch (InterruptedException e) {
System.out.println("Caught Interrupted Exception while waiting to consume currItem: " + e.getMessage());
}
}
switch (Thread.currentThread().getName()) {
/*switch (threadNameToWork) {*/
case "C-odd":
isConsuming = true;
if (currItem % 2 != 0 && threadNameToWork.equals(Thread.currentThread().getName())) {
consumeItems(threadName);
}
isConsuming = false;
notifyAll();
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case "C-even":
isConsuming = true;
if (currItem % 2 == 0 && threadNameToWork.equals(Thread.currentThread().getName())) {
consumeItems(threadName);
}
isConsuming = false;
notifyAll();
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
default:
break;
}
}
private synchronized void consumeItems(String threadName) {
isConsumersShouldWaitProducer = true;
String randNumType = "*odd/even*";
System.out.println("Consumer:" + threadName + " consumed " + randNumType + " Items = " + currItem);
notifyAll();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
You want to do something with Thread name right? The thread name you created in using new Thread will not pass into ExecutorService
, but this will
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("thread-%d").build()
Then
ExecutorService exec = Executors.newSingleThreadExecutor(namedThreadFactory);
Now you have thread with name as thread-1
, thread-2
OR set thread name in your run() method
Thread.currentThread().setName(myName)
To make sure your thread is finished, add this before you return the monitor,
service.shutdown();
while (!service.awaitTermination(10, TimeUnit.SECONDS)) {
log.info("Awaiting completion of threads.");
}