community, I'm trying to solve this Producer/Consumer problem with 10 threads, and I got stuck at a point in implementing it.
The problem looks like this:
The program itself should take have a loop while inserting messages containing (id, timeout), in ascending order by id (1,2,3,4...) and should simply print the id of the message that comes out, in the same order it entered, like a queue.
For example in the photo above, the 3 messages Message(1,200), Message(2, 1000) and Message(3,20) are the first 3 messages that the producer will produce. Although the thread who has been assigned with Message(3,20) should be printed first (because it has the lowest timeout(20)), I want it to wait for the first message which has 200ms timeout to print, then wait again for the message2 with 1000ms to print, then print itself. So all in increasing order(maybe use the id as the ordering number?).
So far I've implemented this:
public class Main {
private static BlockingQueue<Message> queue = new ArrayBlockingQueue<>(5);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
producer();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
consumer();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
public static void producer() throws InterruptedException {
while (true) {
queue.put(new Message());
}
}
public static void consumer() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.submit(queue.take());
}
executorService.shutdown();
}
}
and I have my Message class here:
public class Message implements Runnable {
public static int totalIds = 0;
public int id;
public int timeout;
public Random random = new Random();
public Message() {
this.id = totalIds;
totalIds++;
this.timeout = random.nextInt(5000);
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", timeout=" + timeout +
'}';
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "[RECEIVED] Message = " + toString());
try {
Thread.sleep(timeout);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "[DONE] Message = " + toString() + "\n");
}
}
So far it does everything ok except the part where the threads should wait for the one that has the priority id let's say... here is the first part of the output:
All tasks submitted
pool-1-thread-9[RECEIVED] Message = Message{id=13, timeout=1361}
pool-1-thread-10[RECEIVED] Message = Message{id=14, timeout=92}
pool-1-thread-3[RECEIVED] Message = Message{id=7, timeout=3155}
pool-1-thread-5[RECEIVED] Message = Message{id=9, timeout=562}
pool-1-thread-2[RECEIVED] Message = Message{id=6, timeout=4249}
pool-1-thread-1[RECEIVED] Message = Message{id=0, timeout=1909}
pool-1-thread-7[RECEIVED] Message = Message{id=11, timeout=2468}
pool-1-thread-4[RECEIVED] Message = Message{id=8, timeout=593}
pool-1-thread-8[RECEIVED] Message = Message{id=12, timeout=3701}
pool-1-thread-6[RECEIVED] Message = Message{id=10, timeout=806}
pool-1-thread-10[DONE] Message = Message{id=14, timeout=92}
pool-1-thread-10[RECEIVED] Message = Message{id=15, timeout=846}
pool-1-thread-5[DONE] Message = Message{id=9, timeout=562}
pool-1-thread-5[RECEIVED] Message = Message{id=16, timeout=81}
pool-1-thread-4[DONE] Message = Message{id=8, timeout=593}
pool-1-thread-4[RECEIVED] Message = Message{id=17, timeout=4481}
pool-1-thread-5[DONE] Message = Message{id=16, timeout=81}
pool-1-thread-5[RECEIVED] Message = Message{id=18, timeout=2434}
pool-1-thread-6[DONE] Message = Message{id=10, timeout=806}
pool-1-thread-6[RECEIVED] Message = Message{id=19, timeout=10}
pool-1-thread-6[DONE] Message = Message{id=19, timeout=10}
pool-1-thread-6[RECEIVED] Message = Message{id=20, timeout=3776}
pool-1-thread-10[DONE] Message = Message{id=15, timeout=846}
pool-1-thread-10[RECEIVED] Message = Message{id=21, timeout=2988}
pool-1-thread-9[DONE] Message = Message{id=13, timeout=1361}
pool-1-thread-9[RECEIVED] Message = Message{id=22, timeout=462}
pool-1-thread-9[DONE] Message = Message{id=22, timeout=462}
pool-1-thread-9[RECEIVED] Message = Message{id=23, timeout=3074}
pool-1-thread-1[DONE] Message = Message{id=0, timeout=1909}
pool-1-thread-1[RECEIVED] Message = Message{id=24, timeout=725}
pool-1-thread-7[DONE] Message = Message{id=11, timeout=2468}
One of my friends told me it should be done with semaphores (never worked with them) but I really don't know how to implement semaphores so that they do what I want.
Appreciate any leads on solving this!
As far as I understand, you need two things:
So, you can start the threads one-by-one, so as to let them run in parallel, but also maintain a FIFO queue with their order by ascending id, and just join
each thread in the sequence they were added to that queue.
Here is a demonstrating code on how you can do it:
import java.util.LinkedList;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Main {
private static class Message implements Runnable {
private final TimeUnit sleepUnit;
private final long sleepAmount;
private final int id;
public Message(final int id,
final TimeUnit sleepUnit,
final long sleepAmount) {
this.sleepUnit = Objects.requireNonNull(sleepUnit);
this.sleepAmount = sleepAmount;
this.id = id;
}
@Override
public void run() {
try {
System.out.println(toString() + " started and waiting...");
sleepUnit.sleep(sleepAmount);
}
catch (final InterruptedException ix) {
System.out.println(toString() + " interrupted: " + ix);
}
}
@Override
public String toString() {
return "Message{" + id + ", " + sleepUnit + "(" + sleepAmount + ")}";
}
}
private static class Producer {
private final int parallelism;
private final Consumer<? super Producer> consumer;
public Producer(final int parallelism,
final Consumer<? super Producer> consumer) {
this.parallelism = parallelism;
this.consumer = Objects.requireNonNull(consumer);
}
public void produceWithExecutor() {
System.out.println("Producing with Executor...");
final Random rand = new Random();
final ExecutorService service = Executors.newFixedThreadPool(parallelism);
final LinkedList<Future> q = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
final Message msg = new Message(i, TimeUnit.MILLISECONDS, 500 + rand.nextInt(3000));
q.addLast(service.submit(msg, msg));
}
service.shutdown();
while (!q.isEmpty())
try {
System.out.println(q.removeFirst().get().toString() + " joined."); //Will wait for completion of each submitted task (in FIFO sequence).
}
catch (final InterruptedException ix) {
System.out.println("Interrupted: " + ix);
}
catch (final ExecutionException xx) {
System.out.println("Execution failed: " + xx);
}
consumer.accept(this);
}
public void produceWithPlainThreads() throws InterruptedException {
System.out.println("Producing with Threads...");
final Random rand = new Random();
final LinkedList<Thread> q = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
final Message msg = new Message(i, TimeUnit.MILLISECONDS, 500 + rand.nextInt(3000));
final Thread t = new Thread(msg, msg.toString());
t.start();
q.add(t);
}
while (!q.isEmpty()) {
final Thread t = q.removeFirst();
t.join(); //Will wait for completion of each submitted task (in FIFO sequence).
System.out.println(t.getName() + " joined.");
}
consumer.accept(this);
}
}
public static void main(final String[] args) throws InterruptedException {
final Consumer<Producer> consumer = producer -> System.out.println("Consuming.");
final int parallelism = 10;
new Producer(parallelism, consumer).produceWithExecutor();
new Producer(parallelism, consumer).produceWithPlainThreads();
}
}
As you can see, there are two producing implementations here: one with an ExecutorService
running all the submitted threads, and one with plain threads which are started (almost) at the same time.
This results in an output like so:
Producing with Executor...
Message{1, MILLISECONDS(692)} started and waiting...
Message{2, MILLISECONDS(1126)} started and waiting...
Message{0, MILLISECONDS(3403)} started and waiting...
Message{3, MILLISECONDS(1017)} started and waiting...
Message{4, MILLISECONDS(2861)} started and waiting...
Message{5, MILLISECONDS(2735)} started and waiting...
Message{6, MILLISECONDS(2068)} started and waiting...
Message{7, MILLISECONDS(947)} started and waiting...
Message{8, MILLISECONDS(1091)} started and waiting...
Message{9, MILLISECONDS(1599)} started and waiting...
Message{0, MILLISECONDS(3403)} joined.
Message{1, MILLISECONDS(692)} joined.
Message{2, MILLISECONDS(1126)} joined.
Message{3, MILLISECONDS(1017)} joined.
Message{4, MILLISECONDS(2861)} joined.
Message{5, MILLISECONDS(2735)} joined.
Message{6, MILLISECONDS(2068)} joined.
Message{7, MILLISECONDS(947)} joined.
Message{8, MILLISECONDS(1091)} joined.
Message{9, MILLISECONDS(1599)} joined.
Consuming.
Producing with Threads...
Message{0, MILLISECONDS(3182)} started and waiting...
Message{1, MILLISECONDS(2271)} started and waiting...
Message{2, MILLISECONDS(2861)} started and waiting...
Message{3, MILLISECONDS(2942)} started and waiting...
Message{4, MILLISECONDS(2714)} started and waiting...
Message{5, MILLISECONDS(1228)} started and waiting...
Message{6, MILLISECONDS(2000)} started and waiting...
Message{7, MILLISECONDS(2372)} started and waiting...
Message{8, MILLISECONDS(764)} started and waiting...
Message{9, MILLISECONDS(587)} started and waiting...
Message{0, MILLISECONDS(3182)} joined.
Message{1, MILLISECONDS(2271)} joined.
Message{2, MILLISECONDS(2861)} joined.
Message{3, MILLISECONDS(2942)} joined.
Message{4, MILLISECONDS(2714)} joined.
Message{5, MILLISECONDS(1228)} joined.
Message{6, MILLISECONDS(2000)} joined.
Message{7, MILLISECONDS(2372)} joined.
Message{8, MILLISECONDS(764)} joined.
Message{9, MILLISECONDS(587)} joined.
Consuming.
You can see in the output that in both cases the threads are started (almost) together via a loop, but are joined in a FIFO ordered manner. In the first case you can see that the threads may start in different order which is a side effect of starting threads themselves. In the second case with the plain threads, it happened that all the threads called their run
method in the order in which they were created and started, because this happens in a such a short amount of time. But the joining of each thread will always be in ascending id order according to this code. If you run this code multiple times, you may achieve for thread eg 2 to print in its run method before thread eg 1 in both cases, but the order we wait the threads to finish in the Producer
's methods will always end in ascending id order.
All the threads should exit/finish their run
method in ascending sleeping order and not in ascending id order. But the output will always be in ascending id order because of the way we are iterating over the queue and waiting them to join
in an orderly manner.
So if your want to obtain the result of each Thread
in an ascending id order, then the corresponding code would have to be in your Producer
's produce methods (after you join
each thread) and not in the end of each Message
's run
method (in order to avoid extra synchronization and inter-thread communication).