Search code examples
javawebsocketnettypool

How to use FixedChannelPool with WebSockets?


Hello fellow developers...

I am trying to implement WebSocket fixed connection pool, unfortunately netty provided poor guide for FixedChannelPool and maximum "what you can get" is their Unit Tests

Client code that I am using example Source Code

and from this post found out that bootstrap.handler(new ChannelInitializer<>()) is overridden by ChannelPool

Then I tried to move ChannelInitializer block to:

public class SomeConnectionPoolHandler implements ChannelPoolHandler {

    private final URI uri = URI.create("ws://some.url:80");

    @Override
    public void channelCreated(Channel ch) {
        String protocol = uri.getScheme();
        if (!"ws".equals(protocol)) {
            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
        }

        DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
        final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536), handler);
    }

    @Override
    public void channelReleased(Channel ch) {
        //TODO
    }

    @Override
    public void channelAcquired(Channel ch) {
        //TODO
    }
}

And now my client code looks like:

public class Application {
    private final URI uri = URI.create("ws://some.url:80");
    public static void main(String[] args) {
        Channel ch = null;
        try {
            String protocol = uri.getScheme();
            if (!"ws".equals(protocol)) {
                throw new IllegalArgumentException("Unsupported protocol: " + protocol);
            }
            Bootstrap b = new Bootstrap();
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.remoteAddress(uri.getHost(), uri.getPort());

            b.group(group);
            b.channel(NioSocketChannel.class);
            SomeConnectionPoolHandler connectionPoolHandler = new SomeConnectionPoolHandler ();

            channelPool = new FixedChannelPool(b, connectionPoolHandler, 8);
            ch = channelPool.acquire().sync().get();
            ch.writeAndFlush(new TextWebSocketFrame("test")).sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(ch != null) {
                channelPool.release(ch);
            }
        }
    }
}

Now I am getting error:

java.lang.UnsupportedOperationException: unsupported message type: TextWebSocketFrame (expected: ByteBuf, FileRegion)
    at io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage(AbstractNioByteChannel.java:266)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:799)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:528)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:101)
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
    at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

Solution

  • Ok, I find solution to my problem.

    The problem is that I was trying to write data to Channel before WebSocket Handshake is completed.

    In handler source code you can see that it has method to get ChannelFuture for WebSocket Handshake.

    public ChannelFuture handshakeFuture() {
            return handshakeFuture;
        }
    

    Now, after acquiring Channel from FixedChannelPool, I am calling method (returns whether handshake was completed or not):

    private boolean isConnectionReady(Channel ch) {
            try {
                WebSocketClientHandler handler = (WebSocketClientHandler) ch.pipeline().get("handler");
                return handler.handshakeFuture().sync().isSuccess();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    

    Also added name for WebSocketClientHandler in channelCreated(Channel ch)

    DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
    final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, httpHeaders, 1280000);
    final WebSocketClientHandler handler = new WebSocketClientHandler(handshaker, connection);
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new HttpClientCodec(),  new HttpObjectAggregator(65536));
    pipeline.addLast("handler", handler);
    

    I know that this code now looks awful and should be refactored.