Search code examples
c#multithreadingstreammemorystream

MemoryStream have one thread write to it and another read


This is how I write to a stream then read from it using 1 thread:

        System.IO.MemoryStream ms = new System.IO.MemoryStream();

        // write to it
        ms.Write(new byte[] { 1, 2, 3, 4, 5, 6, 7 }, 0, 7);

        // go to the begining
        ms.Seek(0, System.IO.SeekOrigin.Begin);

        // now read from it
        byte[] myBuffer = new byte[7];
        ms.Read(myBuffer, 0, 7);

Now I was wondering if it is possible to write to the memory-stream from one thread and read that stream from a separate thread.


Solution

  • You can't use a Stream with seeking capabilities from 2 threads simultaneous since a Stream is state full. e.g. A NetworkStream has 2 channels, one for reading and one for writing and therefore can't support seeking.

    If you need seeking capabilities, you need to create 2 streams, one for reading and one for writing respectively. Else you can simply create a new Stream type which allows reading and writing from a underlying memory stream by taking exclusive access to the underlying stream and restore its write/read position. A primitive example of that would be:

    class ProducerConsumerStream : Stream
    {
        private readonly MemoryStream innerStream;
        private long readPosition;
        private long writePosition;
    
        public ProducerConsumerStream()
        {
            innerStream = new MemoryStream();
        }
    
        public override bool CanRead { get { return true;  } }
    
        public override bool CanSeek { get { return false; } }
    
        public override bool CanWrite { get { return true; } }
    
        public override void Flush()
        {
            lock (innerStream)
            {
                innerStream.Flush();
            }
        }
    
        public override long Length
        {
            get 
            {
                lock (innerStream)
                {
                    return innerStream.Length;
                }
            }
        }
    
        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    
        public override int Read(byte[] buffer, int offset, int count)
        {
            lock (innerStream)
            {
                innerStream.Position = readPosition;
                int red = innerStream.Read(buffer, offset, count);
                readPosition = innerStream.Position;
    
                return red;
            }
        }
    
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }
    
        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }
    
        public override void Write(byte[] buffer, int offset, int count)
        {
            lock (innerStream)
            {
                innerStream.Position = writePosition;
                innerStream.Write(buffer, offset, count);
                writePosition = innerStream.Position;
            }
        }
    }