Prerequisites
I have a stream rawStream
and a method which takes a stream and reads it to the end, let's say something like this:
Task UploadFile(Stream stream) { ... }
Currently this method is successfully being used like this:
await UploadFile(rawStream);
What I want to do
Now I need to apply GZip compression to that stream. I wish I could write something like this:
using (var compressedStream = new GZipStream(rawStream, CompressionLevel.Fastest))
{
await UploadFile(compressedStream);
}
But this doesn't work because the stream
parameter of GZipStream
is an output stream, so it's the wrong way round.
Question
How can I wrap my original stream in a compressing stream and still let my consuming function pull the data out of the stream?
Note
The above example is actually simplyfied, because I also need to apply base64 encoding. So what I actually want is something like this:
using (var compressedStream = new GZipStream(rawStream, CompressionLevel.Fastest))
using (var encodedStream = new CryptoStream(compressedStream, new ToBase64Transform(), CryptoStreamMode.Read))
{
await UploadFile(encodedStream);
}
But I guess if someone can explain to me how it works for the compression part, I can figure out how to implement the entire chain.
None of the approaches presented so far really convinced me. So I went ahead and wrote the following customize stream which allows pulling the GZipped and Base64 encoded data out of the stream.
I did some tests and it seems to work fine.
I think this pattern can be useful in other contexts as well for turning a "push pipeline" into a "pull pipeline".
public sealed class GzipBase64Stream : Stream
{
#region constructor / cleanup
public GzipBase64Stream(Stream inputStream)
{
try
{
InputStream = inputStream;
ToBase64Transform = new ToBase64Transform();
OutputStream = new MemoryStream();
Base64Stream = new CryptoStream(OutputStream, ToBase64Transform, CryptoStreamMode.Write);
GzipStream = new GZipStream(Base64Stream, CompressionLevel.Fastest, true);
}
catch
{
Cleanup();
throw;
}
}
private void Cleanup()
{
GzipStream?.Dispose();
Base64Stream?.Dispose();
OutputStream?.Dispose();
ToBase64Transform?.Dispose();
InputStream?.Dispose();
}
#endregion
#region private variables
private bool EndOfInputStreamReached = false;
private readonly Stream InputStream;
private readonly ToBase64Transform ToBase64Transform;
private readonly MemoryStream OutputStream;
private readonly CryptoStream Base64Stream;
private readonly GZipStream GzipStream;
#endregion
#region stream overrides
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => 0;
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void SetLength(long value) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override void Flush() => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count)
{
while ((OutputStream.Position >= OutputStream.Length) && !EndOfInputStreamReached)
{
// No unread data available in the output buffer
// -> release memory of output buffer and read new data from the source and feed through the pipeline
OutputStream.SetLength(0);
var inputBuffer = new byte[1024];
var readCount = InputStream.Read(inputBuffer, 0, inputBuffer.Length);
if (readCount == 0)
{
EndOfInputStreamReached = true;
GzipStream.Flush();
GzipStream.Dispose(); // because Flush() does not actually flush...
}
else
{
GzipStream.Write(inputBuffer, 0, readCount);
}
OutputStream.Position = 0;
}
return OutputStream.Read(buffer, offset, count);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
Cleanup();
}
#endregion
}