I am building an application with several server and client HL7 connections managed by a CommunicationProcess class. Part of the application's functionality is to restart that process when new connections are added. Client connections do not pose a problem because, once the client side stops, there is nothing the server side can do to reconnect. For server connections however, I seem to be getting immediate reconnections from the (rather agressive) client side. This is the code I have to stop a server connection :
public void disconnect()
{
usageServer.getRemoteConnections().forEach((connection) -> connection.close());
usageServer.stopAndWait();
usageServer.getRemoteConnections().forEach((connection) -> connection.close());
}
public void stop()
{
running.set(false);
disconnect();
}
This is my implementation of connectionReceived :
@Override
public void connectionReceived(Connection theC)
{
if (running.get())
{
setStatus(ConnectionStatus.CONNECTED);
}
else
{
theC.close();
}
}
As you can see, the idea is to set a global AtomicBoolean to false when receiving the stop signal from the CommunicationProcess class, which denies any new connections, and stop the server. This, somehow, still allows the client to remain connected during this process. The client side is an application I'm not allowed to name but that has existed for well over a decade and I know for a fact it is not gonna be the issue because I've been supporting it as part of my day job for years and it simply does not behave like that.
Any idea why my code doesn't actually kill the connection? I feel like I've explored a lot of this API and I'm not finding a way to UNREGISTER a connection listener which would probably fix this. Also, there is no way that I can see to extend these server classes as everything is rather ferociously encapsulated and privatized.
Thanks
I was reviewing the code of the HAPI library.
The cause of the behaviour that you describe could be the following.
When the server starts, they creates a component named AcceptorThread
. As it name implies, the responsability of this thread is initialize the ServerSocket
that will be used to receive incoming client connections, and accept them.
This thread, as every Service
abstraction proposed by the API, runs in a loop like this:
/**
* Runs the thread.
*
* @see java.lang.Runnable#run()
*/
public final void run() {
try {
afterStartup();
log.debug("Thread {} entering main loop", name);
while (isRunning()) {
handle();
startupLatch.countDown();
}
log.debug("Thread {} leaving main loop", name);
} catch (RuntimeException t) {
if (t.getCause() != null) {
serviceExitedWithException = t.getCause();
} else {
serviceExitedWithException = t;
}
log.warn("Thread exiting main loop due to exception:", t);
} catch (Throwable t) {
serviceExitedWithException = t;
log.warn("Thread exiting main loop due to exception:", t);
} finally {
startupLatch.countDown();
afterTermination();
}
}
When you invoke the method stopAndWait
in the server, it will try to stop this thread also.
The stop process basically changes the boolean
flag that controls whether the component ``ìsRunning()``` or not.
As you can see, although it sets the flag to false
, the invocation of the method handle
in the loop still must end.
This is the implementation of the AcceptorThread
handle
method:
@Override
protected void handle() {
try {
Socket s = ss.accept();
socketFactory.configureNewAcceptedSocket(s);
if (!queue.offer(new AcceptedSocket(s))) {
log.error("Denied enqueuing server-side socket {}", s);
s.close();
} else
log.debug("Enqueued server-side socket {}", s);
} catch (SocketTimeoutException e) { /* OK - just timed out */
log.trace("No connection established while waiting");
} catch (IOException e) {
log.error("Error while accepting connections", e);
}
}
As you can see, the method invokes ServerSocket.accept
, thus allowing new incoming connections.
In order to disconnect this server side socket, we can call close
from another thread.
In fact, this process is the one implemented by the AcceptorTread
afterTermination
method:
@Override
protected void afterTermination() {
try {
if (ss != null && !ss.isClosed())
ss.close();
} catch (IOException e) {
log.warn("Error during stopping the thread", e);
}
}
Unfortunally - you are right, the API is very close! - there is no a clear way to do that.
One possible solution could be implement your own HL7Service
, name it, MySimpleServer
, using the code of SimpleServer
as a baseline, and just changing the implementation of the method afterTermination
:
/**
* Close down socket
*/
@Override
protected void afterTermination() {
super.afterTermination();
// Terminate server side socket
acceptor.afterTermination();
// Terminate the acceptor thread itself
acceptor.close();
}
Please, pay attention: instead of call acceptor.stop()
we invoke acceptor.afterTermination()
to close directly the underlying server side socket.
To avoid the errors raised by the handle
method in AcceptorThread
, we can also implement a new class from the original one, or just trying to overwrite the handle
method to take into account if the server side socket is closed:
@Override
protected void handle() {
try {
if (ss.isClosed()) {
log.debug("The server-side socket is closed. No new connections will be allowed.");
return;
}
Socket s = ss.accept();
socketFactory.configureNewAcceptedSocket(s);
if (!queue.offer(new AcceptedSocket(s))) {
log.error("Denied enqueuing server-side socket {}", s);
s.close();
} else
log.debug("Enqueued server-side socket {}", s);
} catch (SocketTimeoutException e) { /* OK - just timed out */
log.trace("No connection established while waiting");
} catch (IOException e) {
log.error("Error while accepting connections", e);
}
}
For testing, you can try something like this:
public static void main(String[] args) throws Exception {
HapiContext ctx = new DefaultHapiContext();
HL7Service server = new MySimpleServer(8888);
server.startAndWait();
Connection client1 = ctx.newClient("127.0.0.1", 8888, false);
server.getRemoteConnections().forEach((connection) -> connection.close());
server.stopAndWait();
try {
Connection client2 = ctx.newClient("127.0.0.1", 8888, false);
} catch (Throwable t) {
t.printStackTrace();
}
ctx.close();
System.exit(0);
}