Search code examples
c#.netdata-structureslarge-data

Data Structures & Techniques for operating on large data volumes (1 mln. recs and more)


A WPF .NET 4.5 app that I have been developing, initially to work on small data volumes, now works on much larger data volumes in the region of 1 million and more and of course I started running out of memory. The data comes from a MS SQL DB and data processing needs to be loaded to a local data structure, because this data is then transformed / processed / references by the code in CLR a continuous and uninterrupted data access is required, however not all data has to be loaded into memory straight away, but only when it is actually accessed. As a small example an Inverse Distance Interpolator uses this data to produce interpolated maps and all data needs to be passed to it for a continuous grid generation.

I have re-written some parts of the app for processing data, such as only load x amount of rows at any given time and implement a sliding window approach to data processing which works. However doing this for the rest of the app will require some time investment and I wonder if there can be a more robust and standard way of approaching this design problem (there has to be, I am not the first one)?

tldr; Does C# provide any data structures or techniques for accessing large data amounts in an interrupted manner, so it behaves like a IEnumerable but data is not in memory until it is actually accessed or required, or is it completely up to me to manage memory usage? My ideal would be a structure that would automatically implement a buffer like mechanism and load in more data as of when that data is accessed and freeing memory from the data that has been accessed and no longer of interest. Like some DataTable with an internal buffer maybe?


Solution

  • As far as iterating through a very large data set that is too large to fit in memory goes, you can use a producer-consumer model. I used something like this when I was working with a custom data set that contained billions of records--about 2 terabytes of data total.

    The idea is to have a single class that contains both producer and consumer. When you create a new instance of the class, it spins up a producer thread that fills a constrained concurrent queue. And that thread keeps the queue full. The consumer part is the API that lets you get the next record.

    You start with a shared concurrent queue. I like the .NET BlockingCollection for this.

    Here's an example that reads a text file and maintains a queue of 10,000 text lines.

    public class TextFileLineBuffer
    {
        private const int QueueSize = 10000;
        private BlockingCollection<string> _buffer = new BlockingCollection<string>(QueueSize);
        private CancellationTokenSource _cancelToken;
        private StreamReader reader;
    
        public TextFileLineBuffer(string filename)
        {
            // File is opened here so that any exception is thrown on the calling thread. 
            _reader = new StreamReader(filename);
            _cancelToken = new CancellationTokenSource();
            // start task that reads the file
            Task.Factory.StartNew(ProcessFile, TaskCreationOptions.LongRunning);
        }
    
        public string GetNextLine()
        {
            if (_buffer.IsCompleted)
            {
                // The buffer is empty because the file has been read
                // and all lines returned.
                // You can either call this an error and throw an exception,
                // or you can return null.
                return null;
            }
    
            // If there is a record in the buffer, it is returned immediately.
            // Otherwise, Take does a non-busy wait.
    
            // You might want to catch the OperationCancelledException here and return null
            // rather than letting the exception escape.
    
            return _buffer.Take(_cancelToken.Token);
        }
    
        private void ProcessFile()
        {
            while (!_reader.EndOfStream && !_cancelToken.Token.IsCancellationRequested)
            {
                var line = _reader.ReadLine();
                try
                {
                    // This will block if the buffer already contains QueueSize records.
                    // As soon as a space becomes available, this will add the record
                    // to the buffer.
                    _buffer.Add(line, _cancelToken.Token);
                }
                catch (OperationCancelledException)
                {
                    ;
                }
            }
            _buffer.CompleteAdding();
        }
    
        public void Cancel()
        {
            _cancelToken.Cancel();
        }
    }
    

    That's the bare bones of it. You'll want to add a Dispose method that will make sure that the thread is terminated and that the file is closed.

    I've used this basic approach to good effect in many different programs. You'll have to do some analysis and testing to determine the optimum buffer size for your application. You want something large enough to keep up with the normal data flow and also handle bursts of activity, but not so large that it exceeds your memory budget.

    IEnumerable modifications

    If you want to support IEnumerable<T>, you have to make some minor modifications. I'll extend my example to support IEnumerable<String>.

    First, you have to change the class declaration:

    public class TextFileLineBuffer: IEnumerable<string>
    

    Then, you have to implement GetEnumerator:

    public IEnumerator<String> GetEnumerator()
    {
        foreach (var s in _buffer.GetConsumingEnumerable())
        {
            yield return s;
        }
    }
    
    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
    

    With that, you can initialize the thing and then pass it to any code that expects an IEnumerable<string>. So it becomes:

    var items = new TextFileLineBuffer(filename);
    DoSomething(items);
    
    void DoSomething(IEnumerable<string> list)
    {
        foreach (var s in list)
            Console.WriteLine(s);
    }