Search code examples
javamultithreadingrestsocketsservletcontextlistener

How to notify a specific thread in a multithread application


I'm developing a server app which receives RESTful request from clients and send it to a specific device in a new thread (UDP packet). Besides, it runs another thread launched by a servlet listener at the beginning of the execution, which listens the UDP packets sent from all the devices of the system.

When a client make a request from a specific device, the REST service has to launch a thread from where an UDP packet will be sent to the device, and wait for the response. When the UDP server finally receives a packet from that device (checking the ip from the packet), it has to notify the blocked thread to continue its execution and finish.

I thought about using wait(), notify() and notifyAll() methods, but, as many threads can be blocked waiting for the response of several devices, I don't see how could I notify to unblock only the desired thread (the one that made the request on the responding device). Is there a way of doing this using that methods? Any other approach? Here is some code (simplified):

SocketServletListener:

public class SocketServletListener implements ServletContextListener {

    private UDPServer server;
    private ServletContext context;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        context = sce.getServletContext();  
        server = new UDPServer();
        server.start();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        context = sce.getServletContext();
        server.interrupt();
    }

}

UDPServer:

public class UDPServer extends Thread {

    private SocketUDPCommunication comm;


    public UDPServer() {
        comm = new SocketUDPCommunication();
    }

    @Override
    public void run() {

        DatagramPacket response;
        try {
            comm.setPort(Utils.UDP_SERVER_PORT);
            comm.createSocket();

            while (!Thread.currentThread().isInterrupted()) {
                try {
                    response = comm.receiveResponse();
                } catch (SocketTimeoutException e) {
                    continue;
                }                           
                InetAddress ip = response.getAddress();
                int port = response.getPort();

                byte[] byteSend = comm.discardOffset(response);

                //TODO notify thread which made the request for the responding device (identified by ip)

            }
        } catch (IOException e) {
            System.err.println("Unable to process client request: " + e.getMessage());
        } catch (IllegalArgumentException ex) {
            System.err.println("Illegal Argument: " + ex.getMessage());
        } finally {
            comm.closeConnection();
        }
    }

    @Override
    public void interrupt() {
        super.interrupt();
        comm.closeConnection();
    }
}

DataSend.java:

@Path("dataSend")
public class DataSend {

    @Context
    private UriInfo context;

    public DataSend() {
    }

    @POST
    @Consumes(MediaType.APPLICATION_JSON)   
    public Response postJson(ForceStatus status) {

        new TestExecution(status).start();

        return Response.status(Response.Status.OK).build();     
    }
}   

TestExecution:

public class TestExecution extends Thread {
    private ForceStatus status;

    public ExamExecution(ForceStatus status) {
        this.status = status;
    }

    @Override
    public void run() {
        ProtocolStatus p = new ProtocolStatus();
        byte[] s = p.createResponseFrame(status.getForce());

        List<Integer> executedTest = new ArrayList<>();

        //Simple UDP client
        UDPClient client = new UDPClient();
        .
        .
        .
        //t is a pojo which contains the data from a battery of tests
        while(!executedTest.contains(t.getTestId())) {

            client.send(status.getIp(), status.getPort(), s);
            //TODO wait until UDPServer thread gets the response from the device

            executedTest.add(t.getTestId());

            nextTest = t.getNextTestId();

            t = getEntity(nextTest);
        }       
    }
}

Solution

  • I solved it like this:

    First I created a singleton class to manage a list of requests, that is shared by the differents threads.

    public class SharedWaitingThreads {
        private ArrayList<ResponseToWait> queue;
        private static SharedWaitingThreads mySharedWaitingThreads;
    
        private SharedWaitingThreads() {
            queue = new ArrayList<>();
        }
    
        public static SharedWaitingThreads getInstance() {
            if(mySharedWaitingThreads == null)
                mySharedWaitingThreads = new SharedWaitingThreads();
    
            return mySharedWaitingThreads;
        }
    
        public ArrayList<ResponseToWait> getQueue() {
            return queue;
        }
    
        public void setQueue(ArrayList<ResponseToWait> queue) {
            this.queue = queue;
        }
    
        public void waitForAnswer(ResponseToWait r) throws InterruptedException {
            System.out.println("Petición registrada " + r.toString());
            synchronized(mySharedWaitingThreads) {
                mySharedWaitingThreads.getQueue().add(r);
                while(mySharedWaitingThreads.getQueue().contains(r)) {          
                    mySharedWaitingThreads.wait();
                }
            }
        }
    
    
    
        public ResponseToWait answerWaitingThread(ResponseToWait r, boolean compareSeqNum) {
            ResponseToWait rw = null;
            synchronized(mySharedWaitingThreads) {
                for(ResponseToWait rwAux : mySharedWaitingThreads.getQueue()) {
                    if(rwAux.equals(r)) {
                        rw = rwAux;
                        mySharedWaitingThreads.getQueue().remove(rwAux);
                        //every time a thread is released, notify to release the lock
                        mySharedWaitingThreads.notifyAll(); 
                        break;
                    }
                }
            }
            return rw;
        }
    }
    

    This singleton instance is launched by the main thread (by contextInitialized), and is shared by all threads which need to wait for response to continue its work. ResponseToWait contains all the needed information of each request/thread. equals method is overriden to adapt to the needed functionality (in my case, I compare by ip and type of request)

    public class ExamExecution extends Thread {
    
        private SharedWaitingThreads waitingThreads;
        private static volatile Thread myThread;
    
        public ExamExecution(SharedWaitingThreads waitingThreads) {
            this.waitingThreads = waitingThreads;
        }
    
        @Override
        public void start() {
            myThread = new Thread(this);
            myThread.start();
        }
    
        @Override
        public void run() {
            Thread thisThread = Thread.currentThread();
            ProtocolStatus p = new ProtocolStatus();
            UDPClient client = new UDPClient();
            if(status.getWorkingMode() == WorkingMode.CONTINUE_EXECUTION) {
                byte[] frameRestart = p.createResponseFrame(status.getWorkingMode());
                client.send(getIp(), getPort(), frameRestart);
                //send the frame and block the thread until the server gets the proper response
                try {
                    waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frameRestart));
                } catch (InterruptedException ex) {
                    Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
                }
            }else
            if(status.getForce() == ForceStatus.START) {                
                //get data from database and initialize variables       
                .
                .
                .
    
                while(!executedTest.contains(testInExam.getTestId()) && myThread != null) {
                    int attempts = 0;
                    res = false;
                    seqNumber = this.seqNumber.getValue();
                    while(!res && (attempts < 3)) {
                        TestMemoryMap map = new TestMemoryMap(testInExam.getTestId());
                        byte[] frameConfig = pc.createConfigFrame(Utils.ID_RTU, (byte)1, (byte)0, 
                            Utils.MEM_MAP_VERSION, (byte)0, map.getMemoryMap().length, seqNumber, map.getMemoryMap());
    
                        res = client.send(getIp(), getPort(), frameConfig);
    
                        if(res) {
                            try {
                                System.out.println(Thread.currentThread().getName() + " blocked waiting config answer");
                                waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_CONFIG, frameConfig));
                            } catch (InterruptedException ex) {
                                Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
                        attempts++;
                    }
                    System.out.println("Config frame received:" + res);
    
                    if(res) {
                        byte[] frame = p.createResponseFrame(status.getWorkingMode());
                        client.send(getIp(), getPort(), frame);
    
                        try {
                            System.out.println(Thread.currentThread().getName() + " blocked waiting end execution answer");
                            waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frame));
                        } catch (InterruptedException ex) {
                            Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
                        }               
                    }
                    //add test to list of executed tests
                    executedTest.add(testInExam.getTestId());
                    nextTest = testInExam.getNextTestInExamId();
                    if(nextTest != 0) {
                        testInExam = daot.getEntity(nextTest);
                        testId = testInExam.getTestId();
                    }
                }
            } else if(status.getForce() == ForceStatus.END) {
                System.out.println("Stopping...");
                //abort the execution of the thread
                this.endExecution();
    
            }
        }
    
        private void endExecution() {
            synchronized(myThread) {
                this.myThread = null;
            }   
        }
    }
    

    The udp server thread will have to answer the specifics waiting threads, depending on the response received:

    public class UDPServer extends Thread {
    
        private SocketUDPCommunication comm;
        private UDPClient udpClient;
        private SharedWaitingThreads waitingThreads;
    
        public UDPServer(SharedWaitingThreads waitingThreads) {
            comm = new SocketUDPCommunication();
            udpClient = new UDPClient();
            this.waitingThreads = waitingThreads;
        }
    
    
        @Override
        public void run() {
            DatagramPacket response;
            try {
                comm.setPort(Utils.UDP_SERVER_PORT);
                comm.createSocket();
    
                while (!Thread.currentThread().isInterrupted()) {
                    System.out.println("Waiting for clients to connect on port:" + comm.getSocket().getLocalPort());
                    try {
                        response = comm.receiveResponse();
                    } catch (SocketTimeoutException e) {
                        continue;
                    }                           
                    InetAddress ip = response.getAddress();
                    int port = response.getPort();
    
                    byte[] byteSend = comm.discardOffset(response);
    
                    byte[] header = new byte[Utils.STD_HEADER_SIZE];
                    Utils.getCleanHeader(byteSend, header);
                    byte type = header[12];
    
                    ResponseToWait r1;
                    if(type == Utils.TYPE_CONFIG_REPORT) {
                        ProtocolConfig pc = new ProtocolConfig();
                        pc.parseFrame(byteSend);
                        int mapType = pc.getPayload()[0];
                        int idElement = pc.getPayload()[1];
                        r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_CONFIG, null);
                        if(checkPendingRequests(r1, null)) {
                            System.out.println("Resending config");
                            continue;
                        }
                        waitingThreads.answerWaitingThread(r1, true);
                    }else if(type == Utils.TYPE_STATUS_REPORT) {
                        ProtocolStatus protocol = new ProtocolStatus();
    
                        r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS);
                        if(checkPendingRequests(r1, statusTest)) continue;
                        byte[] frame;
                        if(statusTest.equals(StatusTest.FINALIZED)) {
                            System.out.println("Test finalized. Waking threads");
                            r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS, null);
                            //Free possible waiting threads
                            ResponseToWait res1 = waitingThreads.answerWaitingThread(r1, false);
    
                        }
                    }
                }
            } catch (IOException e) {
                System.err.println("Unable to process client request: " + e.getMessage());
            } catch (IllegalArgumentException ex) {
                System.err.println("Illegal Argument: " + ex.getMessage());
            } catch (InterruptedException ex) {
                Logger.getLogger(UDPServer.class.getName()).log(Level.SEVERE, null, ex);
            } finally {
                comm.closeConnection();
            }
        }
    
        private boolean checkPendingRequests(ResponseToWait rw, StatusTest status) {
            boolean resend = false;
            System.out.println("Status: " + status);
            synchronized(waitingThreads) {
                for(ResponseToWait r : waitingThreads.getQueue()) {
                    if(r.getResponseType() == Utils.TYPE_CONFIG && r.getIp().equals(rw.getIp())) {
                        udpClient.send(r.getIp(), r.getPort(), r.getFrame());
                        resend = true;
                    }
                    if(r.getResponseType() == Utils.TYPE_STATUS && r.getIp().equals(rw.getIp())){
                        udpClient.send(r.getIp(), r.getPort(), r.getFrame());
                        resend = true;  
                    }
                }
            }
            return resend;
        }
    
        @Override
        public void interrupt() {
            super.interrupt();
            comm.closeConnection();
        }
    
    }
    

    Notice that I simplify the code to try to make it more simple and didactic, the real case is quite more complex