Search code examples
c#threadpoolasync-awaittcplistener

What is the async/await equivalent of a ThreadPool server?


I am working on a tcp server that looks something like this using synchronous apis and the thread pool:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

Assuming my goal is to handle as many concurrent connections as possible with the lowest resource usage, this seems that it will be quickly limited by the number of available threads. I suspect that by using Non-blocking Task apis, I will be able to handle much more with fewer resources.

My initial impression is something like:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

But it strikes me that this could cause bottlenecks. Perhaps HandleConnectionAsync will take an unusually long time to hit the first await, and will stop the main accept loop from proceeding. Will this only use one thread ever, or will the runtime magically run things on multiple threads as it sees fit?

Is there a way to combine these two approaches so that my server will use exactly the number of threads it needs for the number of actively running tasks, but so that it will not block threads unnecessarily on IO operations?

Is there an idiomatic way to maximize throughput in a situation like this?


Solution

  • I'd let the Framework manage the threading and wouldn't create any extra threads, unless profiling tests suggest I might need to. Especially, if the calls inside HandleConnectionAsync are mostly IO-bound.

    Anyway, if you like to release the calling thread (the dispatcher) at the beginning of HandleConnectionAsync, there's a very easy solution. You can jump on a new thread from ThreadPool with await Yield(). That works if you server runs in the execution environment which does not have any synchronization context installed on the initial thread (a console app, a WCF service), which is normally the case for a TCP server.

    The following illustrate this (the code is originally from here). Note, the main while loop doesn't create any threads explicitly:

    using System;
    using System.Collections.Generic;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading.Tasks;
    
    class Program
    {
        object _lock = new Object(); // sync lock 
        List<Task> _connections = new List<Task>(); // pending connections
    
        // The core server task
        private async Task StartListener()
        {
            var tcpListener = TcpListener.Create(8000);
            tcpListener.Start();
            while (true)
            {
                var tcpClient = await tcpListener.AcceptTcpClientAsync();
                Console.WriteLine("[Server] Client has connected");
                var task = StartHandleConnectionAsync(tcpClient);
                // if already faulted, re-throw any error on the calling context
                if (task.IsFaulted)
                    await task;
            }
        }
    
        // Register and handle the connection
        private async Task StartHandleConnectionAsync(TcpClient tcpClient)
        {
            // start the new connection task
            var connectionTask = HandleConnectionAsync(tcpClient);
    
            // add it to the list of pending task 
            lock (_lock)
                _connections.Add(connectionTask);
    
            // catch all errors of HandleConnectionAsync
            try
            {
                await connectionTask;
                // we may be on another thread after "await"
            }
            catch (Exception ex)
            {
                // log the error
                Console.WriteLine(ex.ToString());
            }
            finally
            {
                // remove pending task
                lock (_lock)
                    _connections.Remove(connectionTask);
            }
        }
    
        // Handle new connection
        private async Task HandleConnectionAsync(TcpClient tcpClient)
        {
            await Task.Yield();
            // continue asynchronously on another threads
    
            using (var networkStream = tcpClient.GetStream())
            {
                var buffer = new byte[4096];
                Console.WriteLine("[Server] Reading from client");
                var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
                var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
                Console.WriteLine("[Server] Client wrote {0}", request);
                var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
                await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
                Console.WriteLine("[Server] Response has been written");
            }
        }
    
        // The entry point of the console app
        static async Task Main(string[] args)
        {
            Console.WriteLine("Hit Ctrl-C to exit.");
            await new Program().StartListener();
        }
    }
    

    Alternatively, the code might look like below, without await Task.Yield(). Note, I pass an async lambda to Task.Run, because I still want to benefit from async APIs inside HandleConnectionAsync and use await in there:

    // Handle new connection
    private static Task HandleConnectionAsync(TcpClient tcpClient)
    {
        return Task.Run(async () =>
        {
            using (var networkStream = tcpClient.GetStream())
            {
                var buffer = new byte[4096];
                Console.WriteLine("[Server] Reading from client");
                var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
                var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
                Console.WriteLine("[Server] Client wrote {0}", request);
                var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
                await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
                Console.WriteLine("[Server] Response has been written");
            }
        });
    }
    

    Updated, based upon the comment: if this is going to be a library code, the execution environment is indeed unknown, and may have a non-default synchronization context. In this case, I'd rather run the main server loop on a pool thread (which is free of any synchronization context):

    private static Task StartListener()
    {
        return Task.Run(async () => 
        {
            var tcpListener = TcpListener.Create(8000);
            tcpListener.Start();
            while (true)
            {
                var tcpClient = await tcpListener.AcceptTcpClientAsync();
                Console.WriteLine("[Server] Client has connected");
                var task = StartHandleConnectionAsync(tcpClient);
                if (task.IsFaulted)
                    await task;
            }
        });
    }
    

    This way, all child tasks created inside StartListener wouldn't be affected by the synchronization context of the client code. So, I wouldn't have to call Task.ConfigureAwait(false) anywhere explicitly.

    Updated in 2020, someone just asked a good question off-site:

    I was wondering what is the reason for using a lock here? This is not necessary for exception handling. My understanding is that a lock is used because List is not thread safe, therefore the real question is why add the tasks to a list (and incur the cost of a lock under load).

    Since Task.Run is perfectly able to keep track of the tasks it started, my thinking is that in this specific example the lock is useless, however you put it there because in a real program, having the tasks in a list allows us to for example, iterate currently running tasks and terminate the tasks cleanly if the program receives a termination signal from the operating system.

    Indeed, in a real-life scenario we almost always want to keep track of the tasks we start with Task.Run (or any other Task objects which are "in-flight"), for a few reasons:

    • To track task exceptions, which otherwise might be silently swallowed if go unobserved elsewhere.
    • To be able to wait asynchronously for completion of all the pending tasks (e.g., consider a Start/Stop UI button or handling a request to start/stop a inside a headless Windows service).
    • To be able to control (and throttle/limit) the number of tasks we allow to be in-flight simultaneously.

    There are better mechanisms to handle a real-life concurrency workflows (e.g., TPL Dataflow Library), but I did include the tasks list and the lock on purpose here, even in this simple example. It might be tempting to use a fire-and-forget approach, but it's almost never is a good idea. In my own experience, when I did want a fire-and-forget, I used async void methods for that (check this).