I'm not a very experienced Java programmer, so forgive me if this is a bit of a newbie question.
I'm designing a system that consists broadly of 3 modules. A client, a server and an application. The idea is the client sends a message to the server. The server triggers a use case in the application. The result of the use case is returned to the server, and the server sends the results to the client. I opted for this architecture because I'm expecting to need to support multiple clients at once, I want to be able to reuse the server module in other applications, I want to keep the code responsible for managing client connections as uncoupled from the code that implements the domain logic as possible, and because of the opportunity to learn some more advanced java.
I'm planning to tie the various modules together with queues. The client is simple enough. Issue a message and block until a response arrives (it may be oversimplifying but it's a reasonable model for now). The application is equally not a problem. It blocks on its input queue, executes a use case when it receives a valid message and pushes the results to an output queue. Having multiple clients makes things a bit more tricky but still within my grasp with my experience level. The server maintains threads for every open connection, and the server outbound/application inbound queue is synchronised, so if 2 messages arrive at once the second thread will just have to wait a moment for the first thread to deliver its payload into the queue.
The problem is the part in the middle, the server, which will have to block on two independent things. The server is watching both the client, and the application's output queue (which serves as an input queue for the server). The server needs to block until either a message comes in from the client (which it then forwards to the application), or until the application completes a task and pushes the results into the application outbound/server inbound queue.
As far as I can tell, Java can only block on one thing.
Is it possible to have the server block until either the client sends a message or the server inbound queue ceases to be empty?
UPDATE:
I've had a bit of time to work on this, and have managed to pare the problem down to the bare minimum that illustrates the problem. There's a somewhat bulky code dump to follow, even with the trimming, so apologies for that. I'll try to break it up as much as possible.
This is the code for the Server:
public class Server implements Runnable {
private int listenPort = 0;
private ServerSocket serverSocket = null;
private BlockingQueue<Message> upstreamMessaes = null;
private BlockingQueue<Message> downstreamMessages = null;
private Map<Integer, Session> sessions = new ConcurrentHashMap ();
private AtomicInteger lastId = new AtomicInteger ();
/**
* Start listening for clients to process
*
* @throws IOException
*/
@Override
public void run () {
int newSessionId;
Session newSession;
Thread newThread;
System.out.println (this.getClass () + " running");
// Client listen loop
while (true) {
newSessionId = this.lastId.incrementAndGet ();
try {
newSession = new Session (this, newSessionId);
newThread = new Thread (newSession);
newThread.setName ("ServerSession_" + newSessionId);
this.sessions.put (newSessionId, newSession);
newThread.start ();
} catch (IOException ex) {
Logger.getLogger (Server.class.getName ()).log (Level.SEVERE, null, ex);
}
}
}
/**
* Accept a connection from a new client
*
* @return The accepted Socket
* @throws IOException
*/
public Socket accept () throws IOException {
return this.getSocket().accept ();
}
/**
* Delete the specified Session
*
* @param sessionId ID of the Session to remove
*/
public void deleteSession (int sessionId) {
this.sessions.remove (sessionId);
}
/**
* Forward an incoming message from the Client to the application
*
* @param msg
* @return
* @throws InterruptedException
*/
public Server messageFromClient (Message msg) throws InterruptedException {
this.upstreamMessaes.put (msg);
return this;
}
/**
* Set the port to listen to
*
* We can only use ports in the range 1024-65535 (ports below 1024 are
* reserved for common protocols such as HTTP and ports above 65535 don't
* exist)
*
* @param listenPort
* @return Returns itself so further methods can be called
* @throws IllegalArgumentException
*/
public final Server setListenPort (int listenPort) throws IllegalArgumentException {
if ((listenPort > 1023) && (listenPort <= 65535)) {
this.listenPort = listenPort;
} else {
throw new IllegalArgumentException ("Port number " + listenPort + " not valid");
}
return this;
}
/**
* Get the server socket, initialize it if it isn't already started.
*
* @return The object's ServerSocket
* @throws IOException
*/
private ServerSocket getSocket () throws IOException {
if (null == this.serverSocket) {
this.serverSocket = new ServerSocket (this.listenPort);
}
return this.serverSocket;
}
/**
* Instantiate the server
*
* @param listenPort
* @throws IllegalArgumentException
*/
public Server ( int listenPort,
BlockingQueue<Message> incomingMessages,
BlockingQueue<Message> outgoingMessages) throws IllegalArgumentException {
this.setListenPort (listenPort);
this.upstreamMessaes = incomingMessages;
this.downstreamMessages = outgoingMessages;
System.out.println (this.getClass () + " created");
System.out.println ("Listening on port " + listenPort);
}
}
I believe the following method belongs in the Server but is currently commented out.
/**
* Notify a Session of a message for it
*
* @param sessionMessage
*/
public void notifySession () throws InterruptedException, IOException {
Message sessionMessage = this.downstreamMessages.take ();
Session targetSession = this.sessions.get (sessionMessage.getClientID ());
targetSession.waitForServer (sessionMessage);
}
This is my Session class
public class Session implements Runnable {
private Socket clientSocket = null;
private OutputStreamWriter streamWriter = null;
private StringBuffer outputBuffer = null;
private Server server = null;
private int sessionId = 0;
/**
* Session main loop
*/
@Override
public void run () {
StringBuffer inputBuffer = new StringBuffer ();
BufferedReader inReader;
try {
// Connect message
this.sendMessageToClient ("Hello, you are client " + this.getId ());
inReader = new BufferedReader (new InputStreamReader (this.clientSocket.getInputStream (), "UTF8"));
do {
// Parse whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length ());
inputBuffer.append (inReader.readLine ());
System.out.println ("Input message was: " + inputBuffer);
this.server.messageFromClient (new Message (this.sessionId, inputBuffer.toString ()));
} while (!"QUIT".equals (inputBuffer.toString ()));
// Disconnect message
this.sendMessageToClient ("Goodbye, client " + this.getId ());
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
} finally {
this.terminate ();
this.server.deleteSession (this.getId ());
}
}
/**
*
* @param msg
* @return
* @throws IOException
*/
public Session waitForServer (Message msg) throws IOException {
// Generate a response for the input
String output = this.buildResponse (msg.getPayload ()).toString ();
System.out.println ("Output message will be: " + output);
// Output to client
this.sendMessageToClient (output);
return this;
}
/**
*
* @param request
* @return
*/
private StringBuffer buildResponse (CharSequence request) {
StringBuffer ob = this.outputBuffer;
ob.delete (0, this.outputBuffer.length ());
ob.append ("Server repsonded at ")
.append (new java.util.Date ().toString () )
.append (" (You said '" )
.append (request)
.append ("')");
return this.outputBuffer;
}
/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private void sendMessageToClient (CharSequence message) throws IOException {
// Output to client
OutputStreamWriter osw = this.getStreamWriter ();
osw.write ((String) message);
osw.write ("\r\n");
osw.flush ();
}
/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter () throws IOException {
if (null == this.streamWriter) {
BufferedOutputStream os = new BufferedOutputStream (this.clientSocket.getOutputStream ());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
/**
* Terminate the client connection
*/
private void terminate () {
try {
this.streamWriter = null;
this.clientSocket.close ();
} catch (IOException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
}
}
/**
* Get this Session's ID
*
* @return The ID of this session
*/
public int getId () {
return this.sessionId;
}
/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, int sessionId) throws IOException {
System.out.println ("Class " + this.getClass () + " created");
this.server = owner;
this.sessionId = sessionId;
this.clientSocket = this.server.accept ();
System.out.println ("Session ID is " + this.sessionId);
}
}
The test application is fairly basic, it just echoes a modified version of the original request message back. The real application will do work on receipt of a message and returning a meaningful response to the Server.
public class TestApp implements Runnable {
private BlockingQueue <Message> inputMessages, outputMessages;
@Override
public void run () {
Message lastMessage;
StringBuilder returnMessage = new StringBuilder ();
while (true) {
try {
lastMessage = this.inputMessages.take ();
// Construct a response
returnMessage.delete (0, returnMessage.length ());
returnMessage.append ("Server repsonded at ")
.append (new java.util.Date ().toString () )
.append (" (You said '" )
.append (lastMessage.getPayload ())
.append ("')");
// Pretend we're doing some work that takes a while
Thread.sleep (1000);
this.outputMessages.put (new Message (lastMessage.getClientID (), lastMessage.toString ()));
} catch (InterruptedException ex) {
Logger.getLogger (TestApp.class.getName ()).log (Level.SEVERE, null, ex);
}
}
}
/**
* Initialize the application
*
* @param inputMessages Where input messages come from
* @param outputMessages Where output messages go to
*/
public TestApp (BlockingQueue<Message> inputMessages, BlockingQueue<Message> outputMessages) {
this.inputMessages = inputMessages;
this.outputMessages = outputMessages;
System.out.println (this.getClass () + " created");
}
}
The Message class is extremely simple and just consists of an originating client ID and a payload string, so I've left it out.
Finally the main class looks like this.
public class Runner {
/**
*
* @param args The first argument is the port to listen on.
* @throws Exception
*/
public static void main (String[] args) throws Exception {
BlockingQueue<Message> clientBuffer = new LinkedBlockingQueue ();
BlockingQueue<Message> appBuffer = new LinkedBlockingQueue ();
TestApp appInstance = new TestApp (clientBuffer, appBuffer);
Server serverInstance = new Server (Integer.parseInt (args [0]), clientBuffer, appBuffer);
Thread appThread = new Thread (appInstance);
Thread serverThread = new Thread (serverInstance);
appThread.setName("Application");
serverThread.setName ("Server");
appThread.start ();
serverThread.start ();
appThread.join ();
serverThread.join ();
System.exit (0);
}
}
While the real application will be more complex the TestApp illustrates the basic pattern of use. It blocks on its input queue until there's something there, processes it, then pushes the result onto its output queue.
Session classes manage a live connection between a particular client and the server. It takes input from the client and converts it to Message objects, and it takes Message objects from the Server and converts them to output to send to the client.
The Server listens for new incoming connections and sets up a Session object for each incoming connection it has. When a Session passes it a Message, it puts it into its upstream queue for the application to deal with.
The difficulty I'm having is getting return messages to travel back down from the TestApp to the various clients. When a message from a client comes in, the Session generates a Message and sends it to the Server, which then puts it into its upstream queue, which is also the input queue for the TestApp. In response, the TestApp generates a response Message and puts it into the output queue, which is also the downstream queue for the Server.
This means that Sessions need to wait for two unrelated events. They should block until
As for the Server itself, it also has to wait for two unrelated events.
What on the face of it looked like a simple task to achieve is proving a lot more difficult than I first imagined. I expect I'm overlooking something obvious, as I'm still quite new to concurrent programming, but if you can help point out where I'm going wrong I'd appreciate instruction.
Because your implementation is using methods to pass data between client-session-server, you've actually already solved your immediate problem. However, this may not have been your intention. Here's what's happening:
Session's run
method is running in its own thread, blocking on the socket. When the server calls waitForServer
, this method immediately executes in the server's thread - in Java, if a thread calls a method then that method executes in that thread, and so in this case the Session did not need to unblock. In order to create the problem you are trying to solve, you would need to remove the waitForServer
method and replace it with a BlockingQueue messagesFromServer
queue - then the Server would place messages in this queue and Session would would need to block on it, resulting in Session needing to block on two different objects (the socket and the queue).
Assuming that you switch to the implementation where the Session will need to block on two objects, I think you can solve this with a hybrid of the two approaches I described in the comments:
Each Session's socket will need a thread to block on it - I don't see any way around this, unless you're willing to replace this with a fixed thread pool (say, 4 threads) that poll the sockets and sleep for a few dozen milliseconds if there's nothing to read from them.
You can manage all Server -> Sessions traffic with a single queue and a single thread that blocks on it - the Server includes the Session "address" in its payload so that the thread blocking on it knows what to do with the message. If you find that this doesn't scale when you have a lot of sessions, then you can always increase the thread/queue count, e.g. with 32 sessions you can have 4 threads/queues, 8 sessions per thread/queue.