Search code examples
rustrust-tokioreqwest

how to monitor reqwest client upload progress


for downloading with reqwest and tokio and progress I am using the code below

pub async fn download_file(client: &Client, url: &str, path: &str) -> Result<(), String> {
    // Reqwest setup
    let res = client
        .get(url)
        .send()
        .await
        .or(Err(format!("Failed to GET from '{}'", &url)))?;
    let total_size = res
        .content_length()
        .ok_or(format!("Failed to get content length from '{}'", &url))?;

    // Indicatif setup
    let pb = ProgressBar::new(total_size);
    pb.set_style(ProgressStyle::default_bar()
        .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
        .progress_chars("#>-"));
    pb.set_message(format!("Downloading {}", url));

    // download chunks
    let mut file = File::create(path).or(Err(format!("Failed to create file '{}'", path)))?;
    let mut downloaded: u64 = 0;
    let mut stream = res.bytes_stream();

    while let Some(item) = stream.next().await {
        let chunk = item.or(Err(format!("Error while downloading file")))?;
        file.write(&chunk)
            .or(Err(format!("Error while writing to file")))?;
        let new = min(downloaded + (chunk.len() as u64), total_size);
        downloaded = new;
        pb.set_position(new);
    }

    pb.finish_with_message(format!("Downloaded {} to {}", url, path));
    return Ok(());
}

from the while loop I can set progress and see progressbar like examples here https://github.com/mitsuhiko/indicatif

now I am trying to find make progressbar from upload, but could not find the way to monitor reqwest client, code below is my upload function

pub async fn upload_file(client: &Client, url: &str, path: &str) -> Result<(), String> {
    let f = File::open(path).expect("Unable to open file");

    let total_size = f.metadata().unwrap().len();

    // Indicatif setup
    let pb = ProgressBar::new(total_size);
    pb.set_style(ProgressStyle::default_bar()
        .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
        .progress_chars("#>-"));
    pb.set_message(format!("Posting {}", url));


    let file = tokio::fs::File::open(path).await.unwrap();
    let stream = FramedRead::new(file, BytesCodec::new());


    let res=client
    .post(url)
    .body(Body::wrap_stream(stream))
    .send()
    .await;

    pb.finish_with_message(format!("Uploaded {} to {}", url, path));
    return Ok(());
}

upload works but no progressbar with percent or any indicator. there should have been status monitor, like below

    .post(url)
    .body(Body::wrap_stream(stream))
    .send()
    .monitorStatus(|stat|{
        pb.set_position(stat);
    }).....

you can see working code here https://github.com/ozkanpakdil/rust-examples/blob/5f4965f2b086d07c8294352182639dc75232bb30/download_upload/src/download_file.rs#L43 just uncomment those tests and run cargo test

My question is, how to monitor reqwest client for upload and making a progressbar from it ?


Solution

  • You can create an async_stream and yield chunks of the input to upload:

    let file = tokio::fs::File::open(&input).await.unwrap();
    let total_size = file.metadata().await.unwrap().len();
    let input_ = input.to_string();
    let output_ = output.to_string();
    let mut reader_stream = ReaderStream::new(file);
    
    let mut uploaded = HTTPSHandler::get_already_uploaded(output).await;
    bar.set_length(total_size);
    
    let async_stream = async_stream::stream! {
        while let Some(chunk) = reader_stream.next().await {
            if let Ok(chunk) = &chunk {
                let new = min(uploaded + (chunk.len() as u64), total_size);
                uploaded = new;
                bar.set_position(new);
                if(uploaded >= total_size){
                    bar.finish_upload(&input_, &output_);
                }
            }
            yield chunk;
        }
    };
    

    Then, just wrap the stream when building the Body:

    let _ = reqwest::Client::new()
            .put(output)
            .header("content-type", "application/octet-stream")
            .header("Range", "bytes=".to_owned() + &uploaded.to_string() + "-")
            .header(
                reqwest::header::USER_AGENT,
                reqwest::header::HeaderValue::from_static(CLIENT_ID),
            )
            .body(reqwest::Body::wrap_stream(async_stream))
            .send()
            .await
            .unwrap();
    

    Btw, have a look at the implementation of aim, I've faced similar problems there!