Hello fellow developers...
I am trying to implement WebSocket fixed connection pool, unfortunately netty provided poor guide for FixedChannelPool
and maximum "what you can get" is their Unit Tests
Client code that I am using example Source Code
and from this post found out that bootstrap.handler(new ChannelInitializer<>())
is overridden by ChannelPool
Then I tried to move ChannelInitializer
block to:
public class SomeConnectionPoolHandler implements ChannelPoolHandler {
private final URI uri = URI.create("ws://some.url:80");
@Override
public void channelCreated(Channel ch) {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536), handler);
}
@Override
public void channelReleased(Channel ch) {
//TODO
}
@Override
public void channelAcquired(Channel ch) {
//TODO
}
}
And now my client code looks like:
public class Application {
private final URI uri = URI.create("ws://some.url:80");
public static void main(String[] args) {
Channel ch = null;
try {
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
Bootstrap b = new Bootstrap();
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(uri.getHost(), uri.getPort());
b.group(group);
b.channel(NioSocketChannel.class);
SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();
channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
ch = channelPool.acquire().sync().get();
ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(ch != null) {
channelPool.release(ch);
}
}
}
}
Now I am getting error:
java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
Ok, I find solution to my problem.
The problem is that I was trying to write data to Channel
before WebSocket Handshake is completed.
In handler source code you can see that it has method to get ChannelFuture
for WebSocket Handshake.
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
Now, after acquiring Channel
from FixedChannelPool
, I am calling method (returns whether handshake was completed or not):
private boolean isConnectionReady(Channel ch) {
try {
WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
return handler.handshakeFuture().sync().isSuccess();
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
Also added name for WebSocketClientHandler
in channelCreated(Channel ch)
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(65536));
pipeline.addLast("handler", handler);
I know that this code now looks awful and should be refactored.