I've recently been playing around with the new Async CTP, and I've come across a situation where I'm not sure how to proceed.
In my current code base, I'm using a concept of "jobs" and a "job manager". Jobs exist solely for the purpose of handling an initial message, sending a response, and then waiting the response.
I already have existing code based around synchronous sockets, where a network thread is waiting on data to arrive, and then passing it along to an event handler, and eventually to the job manager.
The job manager looks for what job would handle the message, and passes it along.
So the scenario is this:
Here's a pseudocode example:
class MyJob : Job
{
public override void RunJob( IPacketMsg packet )
{
// handle packet
var myReply = new Packet();
SendReply( myReply );
await GetResponse();
}
}
But I'm not entirely sure how to proceed at step 3. The job manager will get the response and then hand it along to the running job. But I'm not sure how to make the job wait for the response.
I've considered creating an awaited Task that simply blocks on a WaitHandle, but is this the best solution?
Are there any other things I could do in this case?
Edit On the subject of the Async CTP, what happens in a situation where the UI is not being used. I've read over Eric Lippert's Async blog, but I don't believe it ever touched on the subject of how everything works in the background without a UI thread (does it spin off a background worker or...?)
- Job manager gets a new message and launches a job.
- The job starts, processes the message, and sends a reply message.
- At this point the job would wait for a response to the reply.
First off, I should mention that the Async CTP handles asynchronous operations very well, but asynchronous events not so much. You may want to consider an Rx-based approach. But let's proceed for the moment with the Async CTP.
You have two basic options to create Tasks:
Task.Factory.StartNew
will run a delegate on the thread pool. Custom task factories and schedulers give you more options for task delegates (e.g., specifying the delegate must be run on an STA thread).TaskFactory.FromAsync
wraps an existing Begin
/End
method pair, TaskEx.FromResult
returns a "future constant", and TaskCompletionSource
can be used to control a Task
explicitly (both FromAsync
and FromResult
use TCS
internally).If the job processing is CPU-bound, it makes sense to pass it off to Task.Factory.StartNew
. I'm going to assume the job processing is CPU-bound.
Job manager pseudo-code:
// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
IJob job = ..;
Task.Factory.StartNew(job.RunJob(message));
}
// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;
// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
var tcs = new TaskCompletionSource<IResponse>();
responseTasks.Add(replyId, tcs);
return tcs.Task;
}
// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
var tcs = responseTasks[response.ReplyId];
responseTasks.Remove(response.ReplyId);
tcs.TrySetComplete(response);
}
The idea is that the job manager also manages a list of oustanding responses. In order for this to happen, I introduced a simple int
reply identifier that the job manager can use to determine which response goes with which reply.
Now jobs can work like this:
public override void RunJob(IPacketMsg packet)
{
// handle packet
var myReply = new Packet();
var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
SendReply(myReply);
await response;
}
There's a few tricky things since we're placing the jobs on the thread pool thread:
GetResponseForReplyAsync
must be invoked (registering the task) before the reply is sent, and is then await
ed later. This is to avoid the situation where a reply may be sent and a response received before we have a chance to register for it.RespondToResponse
will remove the task registration before completing it, just in case completing the task causes another reply to be sent with the same id.If the jobs are short enough that they don't need to be placed on the thread pool thread, then the solution can be simplified.