this problem came up while running a socket server created using consumer/producer design, the program crashed with error cpu time limit exceeded
in log. also i found that cpu
usage was more than 90%
at the time. here's the code of the server, what could be gone wrong with it and how can i optimize this ?
i used this queue
approach to avoid creation of so many threads
for each request.
in main method (main thread)
//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();
//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));
producer.start();
consumer.start();
RequestProducer thread
//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
RequestConsumer thread
//this holds queue instance coming from main thread, same as requestproducer
ConcurrentLinkedQueue<Socket> queue
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
socket = queue.poll();
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
excecuteDbInsert(in);
}
}
} catch (IOException | ParseException ex) {//handle exceptions}
}
Data stream parser
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
int length = in.read(b);
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
CPU time limit exceeded due to aggressive while(true)
loop into your Consumer. below is an example how you can solve the problem.
You can add simple Thread.sleep(1)
in your while loop into the Consumer or use wait/notify pattern to limit CPU consumption.
RequestProducer thread
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestProducer implements Runnable {
//this holds queue instance coming from main thread
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
synchronized (queue) {
System.out.println("notifying");
queue.notify();
}
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}
}
}
RequestConsumer thread
import java.io.IOException;
import java.net.Socket;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class RequestConsumer implements Runnable {
//this holds queue instance coming from main thread, same as requestproducer
final ConcurrentLinkedQueue<Socket> queue;
//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}
public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
System.out.println("Waiting for new socket");
synchronized (queue) {
queue.wait();
}
System.out.println("Acquired new socket");
socket = queue.poll();
try {
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
//excecuteDbInsert(in);
System.out.println(in);
}
} finally {
if (socket != null) {
socket.close();
}
}
}
} catch (IOException ex) {//handle exceptions}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
Data stream parser
import java.io.IOException;
import java.io.InputStream;
public class DataStreamUtil {
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
System.out.println("Waiting for input");
int length = in.read(b);
System.out.println("Got input");
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
}