Search code examples
.netasynchronousminio

Process stream data using producer-consumers with 1 synchronous producer


I have an application with the following workflow.

  1. Users upload row-based json file(each line is a record) using minio
  2. Then they send the application request to process it.

The application then starts downloading data as a stream using this method with the following signature

Task GetObjectAsync(string bucketName, string objectName, Action<Stream> callback)

The callback method I use does this:

void ProcessLine(Stream s)
{
    using(var streamReader = new StreamReader(s))
    {
        while(!streamReader.EndOfStream)
        {
            var line = streamReader.ReadLine(); // notice that I can't use ReadLineAsync
            var obj = DeserializeLine(line);
            // some other operations
            database.Store(obj) 
            // there is an alternative StoreAsync() which I can' use
        }
    }
}

It works good as long as I don't need to use async versions of methods and the files are relatively small.

Unfortunately, I need to be prepared for a use-case when there is only a single yet very large file (20gb or anything that can't fit to the memory, imagine a huge dataset).

To do so, I decided to use implement producer-consumers queue which will be populated from the callback action and then processed by some workers.

I used ConcurentQueue as a data structure and the following callback

void PopulateQueue(Stream s)
{
    using(var streamReader = new StreamReader(s))
    {
        while(!streamReader.EndOfStream)
        {
            var line = streamReader.ReadLine();
            var obj = DeserializeLine(line);
            _queue.Enqueue(obj); // _queue is a private field of a type ConcurentQueue<MyObject> 
        }
    }
}

And workers process in the same manner as was in the original version but with async methods.

The problem here is when the producer is much faster in populating the queue that consumers are in processing(dequeuing the data). The queue is starting to grow and, as was assumed, it will eat up all the memory since the file was very large.

An obvious solution to this is to limit amount of records in the queue. But I don't know how to do that in a synchronous callback. In asynchronous one, I would use await Task.Delay(100) whenever the queue has too many record.

According to this article, I should avoid using Task.Wait() due to its negative impacts on the performance or possibility of deadlock.

I read some articles from Stephen Cleary about async best practices in .Net. Unfortunately, what I understand from them is that in this case, there is no correct way to call async methods from sync callback and I have bad feeling about using Thread.Sleep() or busy waiting.

Do you have any advice how to use producer-consumers pattern without violating async guidelines or solve this issue in other way?

Thank you.

NOTE: I thought about splitting the file into chunks of fixed size when they are uploaded but it has its own pitfalls.


Solution

  • As @Hans Passant pointed out in the comments, there is a BlockingCollection which solves the problem.

    It has a limit of items inside and when the limit is reached, it blocks the producer until the number of items decreases.