Search code examples
rustcompression

Seamless writing of uncompressed output when using the Rust gzp crate


I am using the Rust crate gzp to allow for multi-threaded compression of output data. I have run into one limitation with the interface. In my current application, I would like to optionally generate uncompressed output. This is especially true if the output is going to be written to stdout. However, with the current gzp interface this is challenging since a ParCompress object must call finish() before going out of scope.

I believe the typical solution for this would be something similar to:

let mut writer: Box<dyn Write + Send> = if compress {
    Box::new(
        ParCompressBuilder::new()
            .num_threads(processes)?
            .from_writer(out_file),
    )
} else {
    Box::new(BufWriter::with_capacity(1024 * 1024, out_file))
};

However, this doesn't work since one can't call finish() on writer. Is there a pattern I can use to allow for seamlessly writing data with a ParCompress or BufWriter writer depending on the state of my compress flag?


Solution

  • The lazy solution would be to go with the either crate:

    let mut writer = if compress {
        Either::Right(
            ParCompressBuilder::new()
                .num_threads(processes)?
                .from_writer(out_file),
        )
    } else {
        Either::Left(BufWriter::with_capacity(1024 * 1024, out_file))
    };
    writer.write(…)?; // Conveniently, Either implements Write if both Left and Right implement it
    if let Either::Right(gzp) = writer {
        gzp.finish()?;
    }
    

    A bit more wordy, but more nicely encapsulating would be to define your own enum:

    enum MaybeGzp<T: Write> {
        Gzp(ParCompress<Gzip>),
        Plain(BufWriter<T>),
    }
    
    impl<T: Write + Send + 'static> MaybeGzp<T> {
        pub fn new(file: T, compress: Option<usize>) -> Self {
            match compress {
                Some(processes) => MaybeGzp::Gzp(
                    ParCompressBuilder::new()
                        .num_threads(processes)
                        .expect("Insane thread count")
                        .from_writer(file),
                ),
                None => MaybeGzp::Plain(BufWriter::with_capacity(1024 * 1024, file)),
            }
        }
    
        pub fn finish(self) -> anyhow::Result<()> /* lazy error handling */ {
            Ok(match self {
                MaybeGzp::Gzp(mut gzp) => gzp.finish()?,
                MaybeGzp::Plain(mut bw) => bw.flush()?,
                // Could also just do nothing in the Plain case,
                // but BufWriter's Drop ignores errors on Drop, so better catch it here
            })
        }
    }
    

    which you can then trivially implement write on:

    impl<T: Write> MaybeGzp<T> {
        fn as_write(&mut self) -> &mut dyn Write {
            match self {
                MaybeGzp::Gzp(gzp) => gzp,
                MaybeGzp::Plain(plain) => plain,
            }
        }
    }
    
    impl<T: Write> Write for MaybeGzp<T> {
        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
            self.as_write().write(buf)
        }
    
        fn flush(&mut self) -> std::io::Result<()> {
            self.as_write().flush()
        }
    }
    

    Playground