I'm in the process of familiarizing myself with the async
keyword in C# as well as the library ecosystem as I work towards a practical solution.
My research has lead me to the following static main loop code:
Task loop = Task.Factory.StartNew(async () => {
try {
using(RedisConnection redis = new RedisConnection("localhost")) {
var queue = "my_queue";
var reserved = string.Concat(queue, "_reserved");
redis.Open();
while(true) {
Task.Factory.StartNew(async () => {
var pushRequest = await redis.Lists.RemoveLastAndAddFirstString(0, queue, reserved);
});
}
}
}
catch(Exception ex) {
Console.Error.WriteLineAsync(ex.Message);
}
}, cancellationToken);
loop.Wait();
As you can see, I'm trying to create a non-blocking worker. I feel like I'm nearing a complete solution for the core loop code itself and then obviously what's inside the internal Task.Factory.StartNew
call will start to be moved out into separate classes & methods.
A few questions I have:
Apologies for what might be any amateur errors. As I mentioned, I'm still learning but am hoping to get that last bit of guidance from the community.
You don't need additional pool threads for IO-bound operations (more thoughts on this here). In this light, your re-factored code may look like below:
async Task AsyncLoop(CancellationToken cancellationToken)
{
using (RedisConnection redis = new RedisConnection("localhost"))
{
var queue = "my_queue";
var reserved = string.Concat(queue, "_reserved");
redis.Open();
while (true)
{
// observe cancellation requests
cancellationToken.ThrowIfCancellationRequested();
var pushRequestTask = redis.Lists.RemoveLastAndAddFirstString(0, queue, reserved);
// continue on a random thread after await,
// thanks to ConfigureAwait(false)
var pushRequest = await pushRequestTask.ConfigureAwait(false);
// process pushRequest
}
}
}
void Loop(CancellationToken cancellationToken)
{
try
{
AsyncLoop().Wait();
}
catch (Exception ex)
{
while (ex is AggregateException && ex.InnerException != null)
ex = ex.InnerException;
// might be: await Console.Error.WriteLineAsync(ex.Message),
// but you cannot use await inside catch, so:
Console.Error.WriteLine(ex.Message);
}
}
Note, the absence of Task.Run
/ Task.Factory.StartNew
doesn't mean there's always going to be the same thread. In this example, the code after await
will most likely continue on a different thread, because of ConfigureAwait(false)
. However, this ConfigureAwait(false)
might be redundant if there is no synchronization context on the calling thread (e.g, for a console app). To learn more, refer to the articles listed in async-await Wiki, specifically to "It's All About the SynchronizationContext" by Stephen Cleary.
It's possible to replicate the single threaded, thread-affine behavior of Node.js event loop inside AsyncLoop
, so every continuations after await
is executed on the same thread. You would use a custom task scheduler (or a custom SynchronizationContext
and TaskScheduler.FromCurrentSynchronizationContext
). It isn't difficult to implement, although I'm not sure if that's what you're asking for.
In case this is a server-side app, serializing await
continuations on the same thread a-la Node.js may hurt the app's scalability (especially if there's some CPU-bound work you do between awaits
).
To address your specific questions:
Should any of what I have here right now be moved out of the static main method?
If your're using a chain of async
methods, at the topmost stack frame there's going to be an async-to-Task
transition. In case with a console app, it's OK to block with Task.Wait()
inside Main
, which is the topmost entry point.
I notice that when I run this code, my CPU activity goes up a bit, I assume because I'm creating multiple continuations requesting redis for information. Is this normal or should I be mitigating that?
It's hard to tell why your CPU activity goes up, precisely. I'd rather blame the server side of this app, something that listens to and process redis requests on localhost
.
Is this truly asynchronous/non-blocking at this point and will code inside my inner lambda never tie up my main worker thread?
The code I posted is truly non-blocking, provided that new RedisConnection
is non-blocking and RemoveLastAndAddFirstString
doesn't block anywhere inside its synchronous part.
Does C# spawn additional threads for the continuations?
Not explicitly. There is no thread while the asynchronous IO operation is "in-flight". However, when the operation is getting completed, there will be an IOCP thread from ThreadPool
, assigned to handle the completion. Whether or not the code after await
will continue on this thread or on the original thread depends on the synchronization context. This behavior can be controlled with ConfigureAwait(false)
, as already mentioned.