I am new to using threads. In another class an instance of the ConnectionMaster class is created and started (extends thread). A Client object is given to the ConnectionMaster object which adds it to the list. The overridden run() method of the Thread class essentially listens for a client to be added to the list. Indeed it does listen and "hears" when a Client object is added to the list. However, although .hasNext() returns true .Next() causes an exception. What am I doing wrong?
The following methods are from class ConnectionMaster which extends Thread:
Constructor
public ConnectionMaster(){
clients = new Vector<>();
listIterator = clients.listIterator();
}
Public method for adding client objects to the list
@Override
public synchronized void addClient(Client client) {
listIterator.add(client);
}
This is the overridden thread method of the class Thread. It consistently checks for elements added to the list.
@Override
public void run(){
while(true){
while(listIterator.hasNext()){
processClient(listIterator.next()); //this is where error occurs
listIterator.remove();
}
while(listIterator.hasPrevious()){
processClient(listIterator.previous());
listIterator.remove();
}
}
}
////////////////////////////////UPDATE//////////////////////////////////// Thank You OldCurmudgeon and Stephen C. Based on your feedback, my code has been modified thus:
Constructor
public ConnectionMaster(){
clients = new ArrayBlockingQueue<Client>(1024);
}
Method for receiving client objects
@Override
public synchronized void addClient(Client client) {
try {
clients.put(client);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Listener
@Override
public void run(){
while(true){
try {
processClient((Client)clients.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
This is a very strange way to implement Producer/Consumer. The usual way is to use a BlockingQueue
.
public class TwoThreads {
public static void main(String args[]) throws InterruptedException {
System.out.println("TwoThreads:Test");
new TwoThreads().test();
}
// The end of the list.
private static final Integer End = -1;
static class Producer implements Runnable {
final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
queue.add(i);
Thread.sleep(1);
}
// Finish the queue.
queue.add(End);
} catch (InterruptedException ex) {
// Just exit.
}
}
}
static class Consumer implements Runnable {
final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean ended = false;
while (!ended) {
try {
Integer i = queue.take();
ended = i == End;
System.out.println(i);
} catch (InterruptedException ex) {
ended = true;
}
}
}
}
public void test() throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
Thread pt = new Thread(new Producer(queue));
Thread ct = new Thread(new Consumer(queue));
// Start it all going.
pt.start();
ct.start();
// Wait for it to finish.
pt.join();
ct.join();
}
}