Search code examples
rusthyper

How to copy data from a stream while also forwarding a stream


I am using hyper 0.12 to build a proxy service. When receiving a response body from the upstream server I want to forward it back to the client ASAP, and save the contents in a buffer for later processing.

So I need a function that:

  • takes a Stream (a hyper::Body, to be precise)
  • returns a Stream that is functionally identical to the input stream
  • also returns some sort of Future<Item = Vec<u8>, Error = ...> that is resolved with the buffered contents of the input stream, when the output stream is completely consumed

I can't for the life of me figure out how to do this.

I guess the function I'm looking for will look something like this:

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let body2 = ... // ???
    let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
        buf.extend_from_slice(&chunk);
        // ...somehow send this chunk to body2 also?
    });
    (body2, buffer);
}

Below is what I have tried, and it works until send_data() fails (obviously).

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let (mut sender, body2) = hyper::Body::channel();
    let consume =
        body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
            buf.extend_from_slice(&chunk);

            // What to do if this fails?
            if sender.send_data(chunk).is_err() {}
            Box::new(future::ok(buf))
        });

    (body2, Box::new(consume));
}

However, something tells me I am on the wrong track.

I have found Sink.fanout() which seems like it is what I want, but I do not have a Sink, and I don't know how to construct one. hyper::Body implements Stream but not Sink.


Solution

  • What I ended up doing was implement a new type of stream that does what I need. This appeared to be necessary because hyper::Body does not implement Sink nor does hyper::Chunk implement Clone (which is required for Sink.fanout()), so I cannot use any of the existing combinators.

    First a struct that contains all details that we need and methods to append a new chunk, as well as notify that the buffer is completed.

    struct BodyClone<T> {
        body: T,
        buffer: Option<Vec<u8>>,
        sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
    }
    
    impl BodyClone<hyper::Body> {
        fn flush(&mut self) {
            if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
                if sender.send(buffer).is_err() {}
            }
        }
    
        fn push(&mut self, chunk: &hyper::Chunk) {
            use hyper::body::Payload;
    
            let length = if let Some(buffer) = self.buffer.as_mut() {
                buffer.extend_from_slice(chunk);
                buffer.len() as u64
            } else {
                0
            };
    
            if let Some(content_length) = self.body.content_length() {
                if length >= content_length {
                    self.flush();
                }
            }
        }
    }
    

    Then I implemented the Stream trait for this struct.

    impl Stream for BodyClone<hyper::Body> {
        type Item = hyper::Chunk;
        type Error = hyper::Error;
    
        fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
            match self.body.poll() {
                Ok(Async::Ready(Some(chunk))) => {
                    self.push(&chunk);
                    Ok(Async::Ready(Some(chunk)))
                }
                Ok(Async::Ready(None)) => {
                    self.flush();
                    Ok(Async::Ready(None))
                }
                other => other,
            }
        }
    }
    

    Finally I could define an extension method on hyper::Body:

    pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;
    
    trait CloneBody {
        fn clone_body(self) -> (hyper::Body, BufferFuture);
    }
    
    impl CloneBody for hyper::Body {
        fn clone_body(self) -> (hyper::Body, BufferFuture) {
            let (sender, receiver) = futures::sync::oneshot::channel();
    
            let cloning_stream = BodyClone {
                body: self,
                buffer: Some(Vec::new()),
                sender: Some(sender),
            };
    
            (
                hyper::Body::wrap_stream(cloning_stream),
                Box::new(receiver.map_err(|_| ())),
            )
        }
    }
    

    This can be used as follows:

    let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();