Search code examples
c#.nettask-parallel-libraryasync-awaitproducer-consumer

How to aggregate the data from an async producer and write it to a file?


I'm learning about async/await patterns in C#. Currently I'm trying to solve a problem like this:

  • There is a producer (a hardware device) that generates 1000 packets per second. I need to log this data to a file.

  • The device only has a ReadAsync() method to report a single packet at a time.

  • I need to buffer the packets and write them in the order they are generated to the file, only once a second.

  • Write operation should fail if the write process is not finished in time when the next batch of packets is ready to be written.

So far I have written something like below. It works but I am not sure if this is the best way to solve the problem. Any comments or suggestion? What is the best practice to approach this kind of Producer/Consumer problem where the consumer needs to aggregate the data received from the producer?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

Solution

  • You could use the following idea, provided the criteria for flush is the number of packets (up to 1000). I did not test it. It makes use of Stephen Cleary's AsyncProducerConsumerQueue<T> featured in this question.

    AsyncProducerConsumerQueue<byte[]> _queue;
    Stream _stream;
    
    // producer
    async Task ReceiveAsync(CancellationToken token)
    {
        while (true)
        {
           var list = new List<byte>();
           while (true)
           {
               token.ThrowIfCancellationRequested(token);
               var packet = await _device.ReadAsync(token);
               list.Add(packet);
               if (list.Count == 1000)
                   break;
           }
           // push next batch
           await _queue.EnqueueAsync(list.ToArray(), token);
        }
    }
    
    // consumer
    async Task LogAsync(CancellationToken token)
    {
        Task previousFlush = Task.FromResult(0); 
        CancellationTokenSource cts = null;
        while (true)
        {
           token.ThrowIfCancellationRequested(token);
           // get next batch
           var nextBatch = await _queue.DequeueAsync(token);
           if (!previousFlush.IsCompleted)
           {
               cts.Cancel(); // cancel the previous flush if not ready
               throw new Exception("failed to flush on time.");
           }
           await previousFlush; // it's completed, observe for any errors
           // start flushing
           cts = CancellationTokenSource.CreateLinkedTokenSource(token);
           previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
        }
    }
    

    If you don't want to fail the logger but rather prefer to cancel the flush and proceed to the next batch, you can do so with a minimal change to this code.

    In response to @l3arnon comment:

    1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution.
    1. "A packet is not a byte, it's byte[]" - A packet is a byte, this is obvious from the OP's code: buffer[i] = await device.ReadAsync(). Then, a batch of packets is byte[].
    2. "You haven't used the OP's ToHexString." - The goal was to show how to use Stream.WriteAsync which natively accepts a cancellation token, instead of WriteLineAsync which doesn't allow cancellation. It's trivial to use ToHexString with Stream.WriteAsync and still take advantage of cancellation support:

      var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
          Environment.NewLine);
      _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
      
    3. "AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow" - I don't think this is a determined fact. However, if the OP is concerned about it, he can use regular BlockingCollection, which doesn't block the producer thread. It's OK to block the consumer thread while waiting for the next batch, because writing is done in parallel. As opposed to this, your TPL Dataflow version carries one redundant CPU and lock intensive operation: moving data from producer pipeline to writer pipleline with logAction.Post(packet), byte by byte. My code doesn't do that.

    4. "You await previousFlush for errors just after you throw an exception which makes that line redundant." - This line is not redundant. Perhaps, you're missing this point: previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled is also true. So, await previousFlush is relevant there to observe any errors on the completed tasks (e.g., a write failure), which otherwise will be lost.