I'm following Quarkus - Getting started with Reactive guide and along the sample using Server Sent Events I struggle with the ClosedChannelException. The resteasy resource handler uses vert.x and netty under the hood and the SmallRye Mutiny library for reactive streams.
The resource handler produces count messages in one second intervals.
@Path("/hello")
public class ReactiveGreetingResource {
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(@PathParam int count, @PathParam String name) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().apply(n -> String.format("hello %s - %d", name, n))
.transform().byTakingFirstItems(count);
}
}
The messages are sent to the client upon request. The client side is a simple javascript function embedded into an html page:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html>
<html><head>
<meta charset="UTF-8" />
<title>SSE with Vert.x - Quarkus</title>
<script type="application/javascript">
var eventSource = new EventSource("/hello/stream/5/testname");
eventSource.onmessage = function(event) {
console.log(event.data);
var container = document.getElementById("container");
var paragraph = document.createElement("p");
paragraph.innerHTML = event.data;
container.appendChild(paragraph);
}
</script>
</head>
<body>
<input type="button" onclick="stop()" value="stop sse"/>
<div id="container"></div>
</body>
</html>
On the client side all works as expected. The events are handled and the click on the stop button (while events arrive) closes the EventSource.
But on the server side it causes this exception:
2020-05-09 10:26:33,341 WARN [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-13) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@241d306c(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
How am I supposed to avoid or handle this exception appropriately?
Workaround
The warning disappears after adding quarkus-undertow as a dependency in pom.xml
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-undertow</artifactId> </dependency>
This idea was mentioned in a response to the bug report.