Search code examples
c#socketsasynchronousasync-awaittcpclient

ReadAsync() on two NetworkStreams - Am I using "await" correctly?


I'm using two ReadAsync() calls, and Task.WhenAny() to handle two NetworkStreams (TcpClient).

Will the following await code miss any data-capture?

  • Q1 in code: What happens if both streams have new data at EXACTLY the same time?
  • Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?
  • Q3: Is there a "better" way to tackle this

I am writing a piece of code that is intended to act as a man-in-the-middle filter of a TCP stream (to later allow filtering/monitoring of certain packets)

The generalised logic should be:

  • Client establishes connection to filter, then filter establishes new connection to chosen server
  • If any data arrives from the client, save it and send it to the server
  • If any data arrives from the server, save it and send it to the client

Error-handling exists (where listed) .. Have I missed anything important?


I used the following answer to a question about ".Net 4.5 Async Feature for Socket Programming" as a starting point:

var read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
var read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);

try
{
  while (true)
  {
     Task<int> read_task_occurred;
     try
     {
        read_task_occurred = await Task.WhenAny(read_task_from_client, read_task_from_server);
            //Q1: What happens if both streams have new data at EXACTLY the same time?

        if (read_task_occurred.Status != TaskStatus.RanToCompletion)
        {
          Trace.WriteLine(string.Format("[{0}] - Task failure", ID, read_task_occurred.ToString()));
          break;
        }
     }
     catch (AggregateException aex)
     {
        for (int i = 0; i < aex.Data.Values.Count; i++)
        {
          var aex_item = aex.Data[i];
          Trace.WriteLine(string.Format("[{0}] - Aggregate failure {1} - {2}", ID, i, aex_item));
        }
        break;
     }

     var bytes_read = read_task_occurred.Result;
     if (read_task_occurred.Result == 0)
     {
        // If a read-operation returns zero, the stream has closed.
        OneStreamHasClosed(read_task_from_client, read_task_from_server, read_task_occurred);
        break;
     }

     if (read_task_occurred == read_task_from_client)
     {
        BytesFromClient += read_task_from_client.Result;
        Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
        await tx_stream.WriteAsync(rx_buffer, 0, bytes_read);
        await FileStream_Incoming.WriteAsync(rx_buffer, 0, bytes_read);
            // Q2: Any chance of the WriteAsync taking too long?
            //    (e.g. rx_buffer begins to be filled again before being written to tx_stream or the filestream)

        read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
     }
     else if (read_task_occurred == read_task_from_server)
     {
        BytesFromServer += read_task_from_server.Result;
        Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
        await rx_stream.WriteAsync(tx_buffer, 0, bytes_read);
        await FileStream_Outgoing.WriteAsync(tx_buffer, 0, bytes_read);

        read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
     }
  }
}
finally
{
  FileStream_Incoming.Close();
  FileStream_Outgoing.Close();
}

So far, this seems to work as expected, capturing and logging multiple streams... However, I'm not certain if I'm using the await statements safely

This will later run in multiple threads (possibly one per Incoming-Connection, but that's a separate topic)

Update (to Q2 in code)

By refactoring the original "await tx_stream.Write..." and "await xxx_FileStream.Write..." as follows, I believe that I have been able to improve one main race-condition at Q2.. Still not sure if this is the "best/recommended" solution:

// Code changed to a call to MultiWrite
private void MultiWrite(byte[] buffer, int bytes_read, Stream s1, Stream s2)
{
  Task writer1 = s1.WriteAsync(buffer, 0, bytes_read);
  Task writer2 = s2.WriteAsync(buffer, 0, bytes_read);
  Task.WaitAll(writer1, writer2);
}

Update 2 (code-testing of await)

I've been told that await does not allow concurrent tasks to run... This puzzles me since I then cannot understand how/why the following could run...

private async Task<char> SimpleTask(char x, int sleep_ms) { return await Task.Run(() => { Console.Write(x); Thread.Sleep(sleep_ms); return x; }); }
internal async void DoStuff()
{
  var a_task = SimpleTask('a', 100);
  var b_task = SimpleTask('b', 250);
  var c_task = SimpleTask('c', 333);

  while (true)
  {
    var write_task_occurred = await Task.WhenAny(a_task, b_task, c_task);
    var char_written = write_task_occurred.Result;
    switch (char_written)
    {
      case 'a': a_task = SimpleTask('a', 100); break;
      case 'b': b_task = SimpleTask('b', 250); break;
      case 'c': c_task = SimpleTask('c', 333); break;
    }
  }
}

The snippet above does run (and as I would expect produces the following multi-threaded nonsense:

aabacabaacabaacbaaabcaabacaabacabaabacaabacabaacabaacbaabacaabacabaacabaabacaab

Can anyone explain where/why the above approach is wrong , and if so, how it could be improved.


Update 3: split logic into two methods

I have integrated the "write to output-stream and a file, ensure both outputs have the data in 'buffer' before further Read()" and have also split the code to call MultiWrite() as per my earlier update to Q2:

As per the suggestion(s) by @usr and @Pekka, I have split the code into two methods as below...

private void ProcessStreams_Good()
{
  Task t1 = CopyClientToServer(), t2 = CopyServerToClient();

  Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}", ID, BytesFromClient, BytesFromServer));
  Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async void ProcessStreams_Broken()
{
  await CopyClientToServer(); await CopyServerToClient();

  Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}\r\n", ID, BytesFromClient, BytesFromServer));
  Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}

private async Task CopyClientToServer()
{
  var bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
  while (bytes_read > 0)
  {
    BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
    MultiWrite(rx_buffer, bytes_read, tx_stream, FileStream_FromClient);
    bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
  }
}
private async Task CopyServerToClient()
{
  var bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
  while (bytes_read > 0)
  {
    BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
    MultiWrite(tx_buffer, bytes_read, rx_stream, FileStream_FromServer);
    bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
  }
}

Yes, I am aware of the reason why ProcessStreams_Broken() fails and ProcessStreams_Good() works as expected.

Q: This new code is slightly neater, but is it any "better" ?


Late Update (after question closed)

After the question closed, I came across a Best Practices for async/await link which was quite helpful.


Solution

  • await and WhenAny do not start any operation. They merely wait for a running operation to complete. All reads that you have started will complete eventually and data will be taken from the streams. This is true whether you observe the result or not.

    I understand you want to relay data from client to server and from server to client. So why not start two async methods concurrently each of which does one of the two relay directions? That removes the need from WhenAny and all that complicated logic. You need to throw this away.

    Q1 in code: What happens if both streams have new data at EXACTLY the same time?

    You do not need the answer to that question. You must handle the completion of all reads that you start no matter when they complete. Else, you lose data. Maybe you were assuming that non-complete outstanding reads were (somehow) cancelled and only one read was actually "taking"?! That is not the case. All reads complete. There is no way to cancel one (without dropping the data).

    Q2 in code: Is there any chance of the WriteAsync() taking too long and losing the stored-buffer?

    Not sure what you mean. If a timeout occurs you need a strategy for dealing with that. Usually, you'd log the error and shut down.