Search code examples
rustrust-tokio

from a stream "FramedRead" how to "do something" in every chunk


I would like to display the upload progress of a file using the crate indicatif, I am uploading the file asynchronous using reqwest with something like this:

use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};

let file = File::open(file_path).await?;
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
client.put(url).body(body)

The progress bar is implemented like this:

use indicatif::ProgressBar;

let bar = ProgressBar::new(1000);
for _ in 0..1000 {
    bar.inc(1);
    // ...
}
bar.finish();

How from the stream:

let stream = FramedRead::new(file, BytesCodec::new());
// how on every chunk do X ? 
let body = Body::wrap_stream(stream);

could I call bar.inc(1) on every interaction?

From the docs I see that there is a read_buffer but how to iterate over it in a way that I could use it for calling a custom function or also count the bytes send in the cased I could display "bytes sent" so far for example.


Solution

  • You can use TryStreamExt::inspect_ok, for instance, which will call a closure with a reference to every Ok(item) in the stream when that item is consumed.

    use futures::stream::TryStreamExt;
    use tokio_util::codec::{BytesCodec, FramedRead};
    
    let stream = FramedRead::new(file, BytesCodec::new())
        .inspect_ok(|chunk| {
            // do X with chunk...
        });
    
    let body = Body::wrap_stream(stream);