Search code examples
c#.netfilestreamstreamwriterproducer-consumer

How to periodically flush c# FileStream to the disk?


Context:

I am implementing a logging mechanism for a Web API project that writes serialized objects to a file from multiple methods which in turn is read by an external process (nxLog to be more accurate). The application is hosted on IIS and uses 18 worker processes. The App pool is recycled once a day. The expected load on the services that will incorporate the logging methods is 10,000 req/s. In short this is a classic produces/consumer problem with multiple producers (the methods that produce logs) and one consumer (the external process who reads from the log files). Update: Each process uses multiple threads as well.

I used BlockingCollection to store data (and solve the race condition) and a long running task that writes the data from the collection to the disk.

To write to the disk I am using a StreamWriter and a FileStream.
Because the write frequency is almost constant ( as I said 10,000 write per second) I decided to keep the streams open for the entire lifetime of the application pool and periodically write logs to the disk. I rely on the App Pool recycle and my DI framework to dispose my logger daily. Also note that this class will be singleton, because I didn't want to have more than one thread dedicated to writing from my thread pool.

Apparently the FileStream object will not write to the disk until it is disposed. Now I don't want the FileStream to wait for an entire day until it writes to the disk. The memory it will require to hold all that serialized object will be tremendous, not to mention that any crash on the application or the server will cause data loss or corrupted file.

Now my question:

How can I have the underlying streams (FileStream and StreamWriter) write to the disk periodically without disposing them? My initial assumption was that it will write to the disk once FileSteam exceeds its buffer size which is 4K by default.

UPDATE: The inconsistencies mentioned in the answer have been fixed.

Code:

public class EventLogger: IDisposable, ILogger
{
    private readonly BlockingCollection<List<string>> _queue;
    private readonly Task _consumerTask;
    private FileStream _fs;
    private StreamWriter _sw;
    public EventLogger()
    {            
        OpenFile();
        _queue = new BlockingCollection<List<string>>(50);
        _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
    private void OpenFile()
    {
        _fs?.Dispose();
        _sw?.Dispose();            
        _logFilePath = $"D:\Log\log{DateTime.Now.ToString(yyyyMMdd)}{System.Diagnostic.Process.GetCurrentProcess().Id}.txt";
        _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
        _sw = new StreamWriter(_fs);
    }
    public void Dispose()
    {            
         _queue?.CompleteAdding();
         _consumerTask?.Wait();            
         _sw?.Dispose();
         _fs?.Dispose();
         _queue?.Dispose();            

    }
    public void Log(List<string> list)
    {
        try
        {               
            _queue.TryAdd(list, 100);               

        }
        catch (Exception e)
        {
            LogError(LogLevel.Error, e);
        }
    }
    private void Write()
    {
        foreach (List<string> items in _queue.GetConsumingEnumerable())
        {               
            items.ForEach(item =>
            {                    
                _sw?.WriteLine(item);                    
            });
        }

    }
}

Solution

  • There are a few "inconsistencies" with your question.

    The application is hosted on IIS and uses 18 worker processes

    .

    _logFilePath = $"D:\Log\log{DateTime.Now.ToString(yyyyMMdd)}{System.Diagnostic.Process.GetCurrentProcess().Id}.txt";

    .

    writes serialized objects to a file from multiple methods

    Putting all of this together, you seem to have a single threaded situation as opposed to a multi-threaded one. And since there is a separate log per process, there is no contention problem or need for synchronization. What I mean to say is, I don't see why the BlockingCollection is needed at all. It's possible that you forgot to mention that there are multiple threads within your web process. I will make that assumption here.

    Another problems is that your code does not compile

    1. class name is Logger but the EventLogger function looks like a constructor.
    2. some more incorrect syntax with string, etc

    Putting all that aside, if you really have a contention situation and want to write to the same log via multiple threads or processes, your class seems to have most of what you need. I have modified your class to do some more things. Chief to note are the below items

    1. Fixed all the syntax errors making assumptions
    2. Added a timer, which will call the flush periodically. This will need a lock object so as to not interrupt the write operation
    3. Used an explicit buffer size in the StreamWriter constructor. You should heuristically determine what size works best for you. Also, you should disable AutoFlush from StreamWriter so you can have your writes hit the buffer instead of the file, providing better performance.

    Below is the code with the changes

    public class EventLogger : IDisposable, ILogger {
        private readonly BlockingCollection<List<string>> _queue;
        private readonly Task _consumerTask;
        private FileStream _fs;
        private StreamWriter _sw;
        private System.Timers.Timer _timer;
        private object streamLock = new object();
    
        private const int MAX_BUFFER = 16 * 1024;      // 16K
        private const int FLUSH_INTERVAL = 10 * 1000;  // 10 seconds
    
        public  EventLogger() {
            OpenFile();
            _queue = new BlockingCollection<List<string>>(50);
            _consumerTask = Task.Factory.StartNew(Write, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    
        }
    
        void SetupFlushTimer() {
            _timer = new System.Timers.Timer(FLUSH_INTERVAL);
            _timer.AutoReset = true;
            _timer.Elapsed += TimedFlush;
        }
    
        void TimedFlush(Object source, System.Timers.ElapsedEventArgs e) {
            _sw?.Flush();
        }
    
        private void OpenFile() {
            _fs?.Dispose();
            _sw?.Dispose();
            var _logFilePath = $"D:\\Log\\log{DateTime.Now.ToString("yyyyMMdd")}{System.Diagnostics.Process.GetCurrentProcess().Id}.txt";
            _fs = new FileStream(_logFilePath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite);
            _sw = new StreamWriter(_fs, Encoding.Default, MAX_BUFFER); // TODO: use the correct encoding here
            _sw.AutoFlush = false;
        }
    
        public void Dispose() {
            _timer.Elapsed -= TimedFlush;
            _timer.Dispose();
    
            _queue?.CompleteAdding();
            _consumerTask?.Wait();
            _sw?.Dispose();
            _fs?.Dispose();
            _queue?.Dispose();
    
        }
        public void Log(List<string> list) {
            try {
                _queue.TryAdd(list, 100);
    
            } catch (Exception e) {
                LogError(LogLevel.Error, e);
            }
        }
    
        private void Write() {
            foreach (List<string> items in _queue.GetConsumingEnumerable()) {
                lock (streamLock) {
                    items.ForEach(item => {
                        _sw?.WriteLine(item);
                    });
                }
            }
    
        }
    }
    

    EDIT:
    There are 4 factors controlling the performance of this mechanism, and it is important to understand their relationship. Below example will hopefully make it clear

    Let's say

    • average size of List<string> is 50 Bytes
    • Calls/sec is 10,000
    • MAX_BUFFER is 1024 * 1024 Bytes (1 Meg)

    You are producing 500,000 Bytes of data per second, so a 1 Meg buffer can hold only 2 seconds worth of data. i.e. Even if FLUSH_INTERVAL is set to 10 seconds the buffer will AutoFlush every 2 seconds (on an average) when it runs out of buffer space.

    Also remember that increasing the MAX_BUFFER blindly will not help, since the actual flush operation will take longer due to the bigger buffer size.

    The main thing to understand is that when there is a difference in incoming data rates (to your EventLog class) and outgoing data rates (to the disk), you will either need an infinite sized buffer (assuming continuously running process) or you will have to slow down your avg. incoming rate to match avg. outgoing rate