Search code examples
rustioclosures

In Rust, how to create pipeline processing with multiple libraries taking a Read and a Write?


Working with text streams, I'm using aho_corasick crate which allows efficient replacement of many strings in linear time on streamed text. Its interface for streams consists mainly of the following function :

aho_corasick::ahocorasick::AhoCorasick
pub fn stream_replace_all<R, W, B>(&self, rdr: R, wtr: W, replace_with: &[B]) -> io::Result<()>
where
    R: io::Read,
    W: io::Write,
    B: AsRef<[u8]>,

Now, there is another processing which I'd like to apply to each chunk. My library also takes a Reader and a Writer and applies its own processing (although I can change the interface if necessary)

Question is, is it possible to somehow execute both functions in a pipeline, so that a chunk is processed twice ? In other words, I need to have a Writer of step1 to become sort of Reader of step2

I've tried adding a FnMut param to my library to add the possibility to postprocess or preprocess each chunk, but I can't figure out how to have the above AhoCorasick's function to use it.


Solution

  • If your second step can handle the data in chunks, you can write your own little helper that will connect the steps:

    use std::io::{self, Read, Write};
    
    pub struct Intermediate<Step2, W> {
        step2: Step2,
        final_output: W,
    }
    
    impl<Step2: FnMut(&[u8], &mut W) -> io::Result<()>, W: Write> Write for Intermediate<Step2, W> {
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
            self.write_all(buf)?;
            Ok(buf.len())
        }
    
        fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
            (self.step2)(buf, &mut self.final_output)
        }
    
        fn flush(&mut self) -> io::Result<()> {
            self.final_output.flush()
        }
    }
    
    pub fn two_steps<
        R: Read,
        W: Write,
        Step1: FnOnce(R, Intermediate<Step2, W>) -> io::Result<()>,
        Step2: FnMut(&[u8], &mut W) -> io::Result<()>,
    >(
        input: R,
        step1: Step1,
        step2: Step2,
        final_output: W,
    ) -> io::Result<()> {
        let intermediate = Intermediate {
            step2,
            final_output,
        };
        step1(input, intermediate)
    }