Search code examples
c#queueasync-awaitnonblockingworker

Does this pattern achieve a similar lifecycle to node.js, are there any problems with it?


I'm in the process of familiarizing myself with the async keyword in C# as well as the library ecosystem as I work towards a practical solution.

My research has lead me to the following static main loop code:

Task loop = Task.Factory.StartNew(async () => {

    try {
        using(RedisConnection redis = new RedisConnection("localhost")) {

            var queue = "my_queue";
            var reserved = string.Concat(queue, "_reserved");

            redis.Open();

            while(true) {

                Task.Factory.StartNew(async () => {

                    var pushRequest = await redis.Lists.RemoveLastAndAddFirstString(0, queue, reserved);

                });

            }

        }
    }
    catch(Exception ex) {
        Console.Error.WriteLineAsync(ex.Message);
    }

}, cancellationToken);

loop.Wait();

As you can see, I'm trying to create a non-blocking worker. I feel like I'm nearing a complete solution for the core loop code itself and then obviously what's inside the internal Task.Factory.StartNew call will start to be moved out into separate classes & methods.

A few questions I have:

  • Should any of what I have here right now be moved out of the static main method?
  • I notice that when I run this code, my CPU activity goes up a bit, I assume because I'm creating multiple continuations requesting redis for information. Is this normal or should I be mitigating that?
  • Is this truly asynchronous/non-blocking at this point and will code inside my inner lambda never tie up my main worker thread?
  • Does C# spawn additional threads for the continuations?

Apologies for what might be any amateur errors. As I mentioned, I'm still learning but am hoping to get that last bit of guidance from the community.


Solution

  • You don't need additional pool threads for IO-bound operations (more thoughts on this here). In this light, your re-factored code may look like below:

    async Task AsyncLoop(CancellationToken cancellationToken)
    {
        using (RedisConnection redis = new RedisConnection("localhost"))
        {
    
            var queue = "my_queue";
            var reserved = string.Concat(queue, "_reserved");
    
            redis.Open();
    
            while (true)
            {
                // observe cancellation requests
                cancellationToken.ThrowIfCancellationRequested();
    
                var pushRequestTask = redis.Lists.RemoveLastAndAddFirstString(0, queue, reserved);
    
                // continue on a random thread after await, 
                // thanks to ConfigureAwait(false)
                var pushRequest = await pushRequestTask.ConfigureAwait(false);
    
                // process pushRequest
            }
        }
    }
    
    void Loop(CancellationToken cancellationToken)
    {
        try
        {
            AsyncLoop().Wait();
        }
        catch (Exception ex)
        {
            while (ex is AggregateException && ex.InnerException != null)
                ex = ex.InnerException;
    
            // might be: await Console.Error.WriteLineAsync(ex.Message),
            // but you cannot use await inside catch, so: 
            Console.Error.WriteLine(ex.Message);
        }
    }
    

    Note, the absence of Task.Run / Task.Factory.StartNew doesn't mean there's always going to be the same thread. In this example, the code after await will most likely continue on a different thread, because of ConfigureAwait(false). However, this ConfigureAwait(false) might be redundant if there is no synchronization context on the calling thread (e.g, for a console app). To learn more, refer to the articles listed in async-await Wiki, specifically to "It's All About the SynchronizationContext" by Stephen Cleary.

    It's possible to replicate the single threaded, thread-affine behavior of Node.js event loop inside AsyncLoop, so every continuations after await is executed on the same thread. You would use a custom task scheduler (or a custom SynchronizationContext and TaskScheduler.FromCurrentSynchronizationContext). It isn't difficult to implement, although I'm not sure if that's what you're asking for.

    In case this is a server-side app, serializing await continuations on the same thread a-la Node.js may hurt the app's scalability (especially if there's some CPU-bound work you do between awaits).

    To address your specific questions:

    Should any of what I have here right now be moved out of the static main method?

    If your're using a chain of async methods, at the topmost stack frame there's going to be an async-to-Task transition. In case with a console app, it's OK to block with Task.Wait() inside Main, which is the topmost entry point.

    I notice that when I run this code, my CPU activity goes up a bit, I assume because I'm creating multiple continuations requesting redis for information. Is this normal or should I be mitigating that?

    It's hard to tell why your CPU activity goes up, precisely. I'd rather blame the server side of this app, something that listens to and process redis requests on localhost.

    Is this truly asynchronous/non-blocking at this point and will code inside my inner lambda never tie up my main worker thread?

    The code I posted is truly non-blocking, provided that new RedisConnection is non-blocking and RemoveLastAndAddFirstString doesn't block anywhere inside its synchronous part.

    Does C# spawn additional threads for the continuations?

    Not explicitly. There is no thread while the asynchronous IO operation is "in-flight". However, when the operation is getting completed, there will be an IOCP thread from ThreadPool, assigned to handle the completion. Whether or not the code after await will continue on this thread or on the original thread depends on the synchronization context. This behavior can be controlled with ConfigureAwait(false), as already mentioned.