I'm using two ReadAsync()
calls, and Task.WhenAny()
to handle two NetworkStreams (TcpClient).
Will the following await code miss any data-capture?
I am writing a piece of code that is intended to act as a man-in-the-middle filter of a TCP stream (to later allow filtering/monitoring of certain packets)
The generalised logic should be:
Error-handling exists (where listed) .. Have I missed anything important?
I used the following answer to a question about ".Net 4.5 Async Feature for Socket Programming" as a starting point:
var read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
var read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
try
{
while (true)
{
Task<int> read_task_occurred;
try
{
read_task_occurred = await Task.WhenAny(read_task_from_client, read_task_from_server);
//Q1: What happens if both streams have new data at EXACTLY the same time?
if (read_task_occurred.Status != TaskStatus.RanToCompletion)
{
Trace.WriteLine(string.Format("[{0}] - Task failure", ID, read_task_occurred.ToString()));
break;
}
}
catch (AggregateException aex)
{
for (int i = 0; i < aex.Data.Values.Count; i++)
{
var aex_item = aex.Data[i];
Trace.WriteLine(string.Format("[{0}] - Aggregate failure {1} - {2}", ID, i, aex_item));
}
break;
}
var bytes_read = read_task_occurred.Result;
if (read_task_occurred.Result == 0)
{
// If a read-operation returns zero, the stream has closed.
OneStreamHasClosed(read_task_from_client, read_task_from_server, read_task_occurred);
break;
}
if (read_task_occurred == read_task_from_client)
{
BytesFromClient += read_task_from_client.Result;
Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
await tx_stream.WriteAsync(rx_buffer, 0, bytes_read);
await FileStream_Incoming.WriteAsync(rx_buffer, 0, bytes_read);
// Q2: Any chance of the WriteAsync taking too long?
// (e.g. rx_buffer begins to be filled again before being written to tx_stream or the filestream)
read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
}
else if (read_task_occurred == read_task_from_server)
{
BytesFromServer += read_task_from_server.Result;
Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
await rx_stream.WriteAsync(tx_buffer, 0, bytes_read);
await FileStream_Outgoing.WriteAsync(tx_buffer, 0, bytes_read);
read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
}
}
}
finally
{
FileStream_Incoming.Close();
FileStream_Outgoing.Close();
}
So far, this seems to work as expected, capturing and logging multiple streams... However, I'm not certain if I'm using the await statements safely
This will later run in multiple threads (possibly one per Incoming-Connection, but that's a separate topic)
By refactoring the original "await tx_stream.Write..." and "await xxx_FileStream.Write..." as follows, I believe that I have been able to improve one main race-condition at Q2.. Still not sure if this is the "best/recommended" solution:
// Code changed to a call to MultiWrite
private void MultiWrite(byte[] buffer, int bytes_read, Stream s1, Stream s2)
{
Task writer1 = s1.WriteAsync(buffer, 0, bytes_read);
Task writer2 = s2.WriteAsync(buffer, 0, bytes_read);
Task.WaitAll(writer1, writer2);
}
I've been told that await does not allow concurrent tasks to run... This puzzles me since I then cannot understand how/why the following could run...
private async Task<char> SimpleTask(char x, int sleep_ms) { return await Task.Run(() => { Console.Write(x); Thread.Sleep(sleep_ms); return x; }); }
internal async void DoStuff()
{
var a_task = SimpleTask('a', 100);
var b_task = SimpleTask('b', 250);
var c_task = SimpleTask('c', 333);
while (true)
{
var write_task_occurred = await Task.WhenAny(a_task, b_task, c_task);
var char_written = write_task_occurred.Result;
switch (char_written)
{
case 'a': a_task = SimpleTask('a', 100); break;
case 'b': b_task = SimpleTask('b', 250); break;
case 'c': c_task = SimpleTask('c', 333); break;
}
}
}
The snippet above does run (and as I would expect produces the following multi-threaded nonsense:
aabacabaacabaacbaaabcaabacaabacabaabacaabacabaacabaacbaabacaabacabaacabaabacaab
Can anyone explain where/why the above approach is wrong , and if so, how it could be improved.
I have integrated the "write to output-stream and a file, ensure both outputs have the data in 'buffer' before further Read()
" and have also split the code to call MultiWrite()
as per my earlier update to Q2:
As per the suggestion(s) by @usr and @Pekka, I have split the code into two methods as below...
private void ProcessStreams_Good()
{
Task t1 = CopyClientToServer(), t2 = CopyServerToClient();
Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}", ID, BytesFromClient, BytesFromServer));
Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async void ProcessStreams_Broken()
{
await CopyClientToServer(); await CopyServerToClient();
Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}\r\n", ID, BytesFromClient, BytesFromServer));
Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async Task CopyClientToServer()
{
var bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
while (bytes_read > 0)
{
BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
MultiWrite(rx_buffer, bytes_read, tx_stream, FileStream_FromClient);
bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
}
}
private async Task CopyServerToClient()
{
var bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
while (bytes_read > 0)
{
BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
MultiWrite(tx_buffer, bytes_read, rx_stream, FileStream_FromServer);
bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
}
}
Yes, I am aware of the reason why ProcessStreams_Broken()
fails and ProcessStreams_Good()
works as expected.
Q: This new code is slightly neater, but is it any "better" ?
After the question closed, I came across a Best Practices for async/await link which was quite helpful.
await
and WhenAny
do not start any operation. They merely wait for a running operation to complete. All reads that you have started will complete eventually and data will be taken from the streams. This is true whether you observe the result or not.
I understand you want to relay data from client to server and from server to client. So why not start two async methods concurrently each of which does one of the two relay directions? That removes the need from WhenAny
and all that complicated logic. You need to throw this away.
Q1 in code: What happens if both streams have new data at EXACTLY the same time?
You do not need the answer to that question. You must handle the completion of all reads that you start no matter when they complete. Else, you lose data. Maybe you were assuming that non-complete outstanding reads were (somehow) cancelled and only one read was actually "taking"?! That is not the case. All reads complete. There is no way to cancel one (without dropping the data).
Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?
Not sure what you mean. If a timeout occurs you need a strategy for dealing with that. Usually, you'd log the error and shut down.