Search code examples
c#streamcompressiongzip

Apply GZip compression to a stream used by a method that pulls data


Prerequisites

I have a stream rawStream and a method which takes a stream and reads it to the end, let's say something like this:

Task UploadFile(Stream stream) { ... }

Currently this method is successfully being used like this:

await UploadFile(rawStream);

What I want to do

Now I need to apply GZip compression to that stream. I wish I could write something like this:

using (var compressedStream = new GZipStream(rawStream, CompressionLevel.Fastest))
{
    await UploadFile(compressedStream);
}

But this doesn't work because the stream parameter of GZipStream is an output stream, so it's the wrong way round.

Question

How can I wrap my original stream in a compressing stream and still let my consuming function pull the data out of the stream?


Note

The above example is actually simplyfied, because I also need to apply base64 encoding. So what I actually want is something like this:

using (var compressedStream = new GZipStream(rawStream, CompressionLevel.Fastest))
using (var encodedStream = new CryptoStream(compressedStream, new ToBase64Transform(), CryptoStreamMode.Read))
{
    await UploadFile(encodedStream);
}

But I guess if someone can explain to me how it works for the compression part, I can figure out how to implement the entire chain.


Solution

  • None of the approaches presented so far really convinced me. So I went ahead and wrote the following customize stream which allows pulling the GZipped and Base64 encoded data out of the stream.

    I did some tests and it seems to work fine.

    I think this pattern can be useful in other contexts as well for turning a "push pipeline" into a "pull pipeline".

    public sealed class GzipBase64Stream : Stream
    {
    
        #region constructor / cleanup
    
        public GzipBase64Stream(Stream inputStream)
        {
            try
            {
                InputStream = inputStream;
                ToBase64Transform = new ToBase64Transform();
                OutputStream = new MemoryStream();
                Base64Stream = new CryptoStream(OutputStream, ToBase64Transform, CryptoStreamMode.Write);
                GzipStream = new GZipStream(Base64Stream, CompressionLevel.Fastest, true);
            }
            catch
            {
                Cleanup();
                throw;
            }
        }
    
        private void Cleanup()
        {
            GzipStream?.Dispose();
            Base64Stream?.Dispose();
            OutputStream?.Dispose();
            ToBase64Transform?.Dispose();
            InputStream?.Dispose();
        }
    
        #endregion
    
        #region private variables
    
        private bool EndOfInputStreamReached = false;
    
        private readonly Stream InputStream;
        private readonly ToBase64Transform ToBase64Transform;
        private readonly MemoryStream OutputStream;
        private readonly CryptoStream Base64Stream;
        private readonly GZipStream GzipStream;
    
        #endregion
    
        #region stream overrides
    
        public override bool CanRead => true;
    
        public override bool CanSeek => false;
    
        public override bool CanWrite => false;
    
        public override long Length => 0;
    
        public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    
        public override void SetLength(long value) => throw new NotSupportedException();
    
        public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    
        public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    
        public override void Flush() => throw new NotSupportedException();
    
        public override int Read(byte[] buffer, int offset, int count)
        {
    
            while ((OutputStream.Position >= OutputStream.Length) && !EndOfInputStreamReached)
            {
                // No unread data available in the output buffer
                // -> release memory of output buffer and read new data from the source and feed through the pipeline
                OutputStream.SetLength(0);
                var inputBuffer = new byte[1024];
                var readCount = InputStream.Read(inputBuffer, 0, inputBuffer.Length);
                if (readCount == 0)
                {
                    EndOfInputStreamReached = true;
                    GzipStream.Flush();
                    GzipStream.Dispose(); // because Flush() does not actually flush...
                }
                else
                {
    
                    GzipStream.Write(inputBuffer, 0, readCount);
                }
                OutputStream.Position = 0;
            }
    
            return OutputStream.Read(buffer, offset, count);
        }
    
        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
            if (disposing)
                Cleanup();
        }
    
        #endregion
    
    }