Search code examples
javamultithreadingsocketsconcurrencyproducer-consumer

TCP socket server with consumer-producer design 'cpu time limit exceeded'


this problem came up while running a socket server created using consumer/producer design, the program crashed with error cpu time limit exceeded in log. also i found that cpu usage was more than 90% at the time. here's the code of the server, what could be gone wrong with it and how can i optimize this ?

i used this queue approach to avoid creation of so many threads for each request.

in main method (main thread)

//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();

//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));

producer.start();
consumer.start();

RequestProducer thread

//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue

//constructor, initiate queue
public RequestProducer(
    ConcurrentLinkedQueue<Socket> queue
) {
    this.queue = queue;
}

public void run() {
    try {
        //create serversocket instance on port 19029
        ServerSocket serverSocket = new ServerSocket(19029);
        while (true) {
            try {
                //keep accept connections
                Socket socket = serverSocket.accept();
                //add socket to queue
                queue.offer(socket);
            } catch (ConnectException ce) {//handle exception
            } catch (SocketException e) {//handle exception
            }
        }
    } catch (IOException ex) {//handle exception}
}

RequestConsumer thread

//this holds queue instance coming from main thread, same as requestproducer
ConcurrentLinkedQueue<Socket> queue

//constructor, initiate queue
public RequestConsumer(
    ConcurrentLinkedQueue<Socket> queue
) {
    this.queue = queue;
}

public void run() {
    try {
        Socket socket = null;
        while (true) {
            //get head of the queue (socket instance)
            socket = queue.poll();
            if (null != socket) {
                //process data stream
                String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
                //close socket conection
                socket.close();
                //excecute database insert of processed data
                excecuteDbInsert(in);
            }
        }
    } catch (IOException | ParseException ex) {//handle exceptions}
}

Data stream parser

public static String parseAsciiSockStream(InputStream in) throws IOException {
    StringBuilder builder = new StringBuilder();
    if (null != in) {
        byte[] b = new byte[BYTE_STREAM_MAX];
        int length = in.read(b);
        for (int i = 0; i < length; i++) {
            builder.append((char) (int) b[i]);
        }
        in.close();
    }
    return builder.toString();
}

Solution

  • CPU time limit exceeded due to aggressive while(true) loop into your Consumer. below is an example how you can solve the problem.

    You can add simple Thread.sleep(1) in your while loop into the Consumer or use wait/notify pattern to limit CPU consumption.

    RequestProducer thread

    import java.io.IOException;
    import java.net.ConnectException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketException;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class RequestProducer implements Runnable {
        //this holds queue instance coming from main thread
        final ConcurrentLinkedQueue<Socket> queue;
    
        //constructor, initiate queue
        public RequestProducer(
                ConcurrentLinkedQueue<Socket> queue
        ) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                //create serversocket instance on port 19029
                ServerSocket serverSocket = new ServerSocket(19029);
                while (true) {
                    try {
                        //keep accept connections
                        Socket socket = serverSocket.accept();
                        //add socket to queue
                        queue.offer(socket);
                        synchronized (queue) {
                            System.out.println("notifying");
                            queue.notify();
                        }
                    } catch (ConnectException ce) {//handle exception
                    } catch (SocketException e) {//handle exception
                    }
                }
            } catch (IOException ex) {//handle exception}
            }
    
        }
    }
    

    RequestConsumer thread

    import java.io.IOException;
    import java.net.Socket;
    import java.text.ParseException;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class RequestConsumer implements Runnable {
        //this holds queue instance coming from main thread, same as requestproducer
        final ConcurrentLinkedQueue<Socket> queue;
    
        //constructor, initiate queue
        public RequestConsumer(
                ConcurrentLinkedQueue<Socket> queue
        ) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                Socket socket = null;
                while (true) {
                    //get head of the queue (socket instance)
                    System.out.println("Waiting for new socket");
                    synchronized (queue) {
                        queue.wait();
                    }
                    System.out.println("Acquired new socket");
    
                    socket = queue.poll();
                    try {
                        if (null != socket) {
                            //process data stream
                            String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
                            //close socket conection
                            socket.close();
                            //excecute database insert of processed data
                            //excecuteDbInsert(in);
    
                            System.out.println(in);
                        }
                    } finally {
                        if (socket != null) {
                            socket.close();
                        }
                    }
    
                }
            } catch (IOException ex) {//handle exceptions}
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    
    }
    

    Data stream parser

    import java.io.IOException;
    import java.io.InputStream;
    
    public class DataStreamUtil {
        public static String parseAsciiSockStream(InputStream in) throws IOException {
            StringBuilder builder = new StringBuilder();
            if (null != in) {
                byte[] b = new byte[BYTE_STREAM_MAX];
                System.out.println("Waiting for input");
                int length = in.read(b);
                System.out.println("Got input");
                for (int i = 0; i < length; i++) {
                    builder.append((char) (int) b[i]);
                }
                in.close();
            }
            return builder.toString();
        }
    }