Search code examples
swiftgrand-central-dispatchswift-nio

How to use worker queues in SwiftNIO?


I have a Swift NIO HTTP2 server which handles request within the context's event loop. But I want to process the request in another thread, GCD aync thread pool and get the result back and send it.

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
    context.eventLoop.execute {
       context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in 
            // ...
            var buffer = context.channel.allocator.buffer(capacity: respBody.count)
            buffer.writeString(respBody)        
            context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
            return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
        }.whenComplete { _ in
            context.close(promise: nil)
        }
    }
}

If I change it to use GCD global queue, how would I return the EventLoopFuture<Void> response?

context.eventLoop.execute {
    context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
        DispatchQueue.global().async {
            return self.send("hello world new ok", to: context.channel).whenComplete({ _ in
                _ = context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
                context.close(promise: nil)
            })
        }
    }
}

Is it okay to use GCD global queue in this way or how will I use worker threads?


The send string function calls the below function to write the body.

private func sendData(_ data: Data, to channel: Channel, context: StreamContext) -> EventLoopFuture<Void> {
    let headers = self.getHeaders(contentLength: data.count, context: context)
    _ = self.sendHeader(status: .ok, headers: headers, to: channel, context: context)
    var buffer = channel.allocator.buffer(capacity: data.count)
    buffer.writeBytes(data)
    let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
    return channel.writeAndFlush(part)
}

Solution

  • The rules in SwiftNIO are:

    • operations on Channels are thread-safe, so you can do them from any thread or queue
    • operations on ChannelHandlerContext are not thread-safe and should only be done from within the ChannelHandler. All of the ChannelHandler's events are called on the right EventLoop.

    So your example is almost correct, just make sure to only use the Channel and never the ChannelHandlerContext from a DispatchQueue or any other thread (that isn't the channel's EventLoop).

    let channel = context.channel // save on the EventLoop
    channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in
        DispatchQueue.global().async {
            self.send("hello world new ok", to: channel).flatMap {
                channel.writeAndFlush(HTTPServerResponsePart.end(nil))
            }.whenComplete {
                channel.close(promise: nil)
            }
        }
    }
    

    There's one assumption I'm making here which is that self.send is okay with being called from any thread and doesn't use ChannelHandlerContext that you might have stored on self. To assess if self.send is okay here, I'd need to know what exactly it does.


    As an aside, in your first code snippet, you have a redundant eventloop.execute:

    public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
       // eventLoop.execute not necessary here
       context.channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { streamID -> EventLoopFuture<Void> in 
            // ...
            var buffer = context.channel.allocator.buffer(capacity: respBody.count)
            buffer.writeString(respBody)        
            context.channel.write(self.wrapOutboundOut(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
            return context.channel.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)))
        }.whenComplete { _ in
            context.close(promise: nil)
        }
    }
    

    The context.eventLoop.execute is unnecessary because any event on a ChannelHandler is always invoked in the correct EventLoop.