Search code examples
asynchronousrustdeadlockrust-tokiohyper

Hyper client with tokio File stuck at write()


I am trying to do a full-async download attempt.

The download works fine so far.

Using std::fs::File it works fine but I wanted to try tokios File to make the code fully async.

If I just download the file and let the data vanish, it works. But when I use tokio::fs::File to write async the data to disk, the download gets stuck at random locations. Sometimes at 1.1MB, mostly at ~1.6MB. Total is ~9MB.

My test URL is https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz

The last output I get is the debug!("Received...") line.

The nearly complete output is:

DEBUG: Temp File: /tmp/26392_1625868800106141_ZhWUtnaD.tmp

DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Ticket saved
DEBUG: Status: 302 Found
[...]
DEBUG: content-length: 621
DEBUG: Sending warning alert CloseNotify

DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github-releases.githubusercontent.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Status: 200 OK
[...]
DEBUG: content-length: 9441947

DEBUG: Received 16384 bytes (16384 total)
DEBUG: Written 16384 bytes (16384 total)
DEBUG: Received 9290 bytes (25674 total)
DEBUG: Written 9290 bytes (25674 total)
DEBUG: Received 16384 bytes (42058 total)
DEBUG: Written 16384 bytes (42058 total)
[...]
DEBUG: Received 8460 bytes (1192010 total)
DEBUG: Written 8460 bytes (1192010 total)
DEBUG: Received 8948 bytes (1200958 total)
DEBUG: Written 8948 bytes (1200958 total)
DEBUG: Received 8460 bytes (1209418 total)
DEBUG: Written 8460 bytes (1209418 total)
DEBUG: Received 8948 bytes (1218366 total)
[PROCESS STUCK HERE]

It feels like there is a deadlock or something that is blocking the write. But I can't find out whats wrong. Why does this get stuck?

Code:

async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> MyResult<()> {
    let mut uri = hyper::Uri::from_str(url.as_str())?;

    let mut total_read: usize = 0;
    let mut total_written: usize = 0;
    let mut localfile = File::create(localpath).await?;

    // Redirection Limit
    for i in 0..10 {
        let https = HttpsConnector::with_native_roots();
        let client = Client::builder().build::<_, hyper::Body>(https);
        let mut resp = client.get(uri.clone()).await?;

        let status = resp.status();
        let header = resp.headers();

        debug!("Status: {}", status);
        for (key, value) in resp.headers() {
            debug!("HEADER {}: {}", key, value.to_str().unwrap());
        }

        if status.is_success() {
            // tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await?;

            let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
            
            while let Some(next) = resp.data().await {
                let mut chunk = next?;
            
                let num_bytes = chunk.len();
                total_read = total_read + num_bytes;
                debug!("Received {} bytes ({} total)", num_bytes, total_read);
            
                // localfile.write_all(&chunk).await?;
                let written = localfile.write(&chunk).await?;
                total_written = total_written + written;
                debug!("Written {} bytes ({} total)", written, total_written);
            
                if total_read != total_written {
                    error!("Could not write all data!");
                }
            
                if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
                    return Ok(());
                }
            }

            return Ok(());
        } else if status.is_redirection() {
            let location = header.get("location").unwrap().to_str().unwrap();

            uri = hyper::Uri::from_str(location)?;
        } else {
            let uri_str = uri.to_string();

            return Err(MyError::CustomError(CustomError::from_string(format!("HTTP responded with status {}: {}", status, uri_str))))
        }
    }

    Err(MyError::CustomError(CustomError::from_string(format!("HTTP too many redirections"))))
}

Crates (incomplete, relevant only):

futures = "0.3"
futures-cpupool = "0.1"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"

As you can see the download loop matches example code of the Hyper documentation.

I added the tokio::fs::File writing part

I added debug information (mostly byte sizes) to find out whats going on and where.

The comments are the ideal way: Using write_all or if possible io::copy.

But I can't manage to get it working without getting stuck.

Could you please give me an advice where my mistake is?

Thank you very much


Solution

  • Thanks to @HHK in the comments above.

    He recommended to build a minimal, reproducible example. While doing that, the example worked fine.

    So I iteratively added the code from the original project around it.

    The last step I added was a relic I did not remove when making the project async and learning about async.

    I had a futures::block_on call within an async function calling an async function which resulted in blocking the whole program randomly.

    So I should have made a full working piece of code before posting which would have lead me to the original problem an saved me a lot of headache.

    For the future reader:

    futures = "0.3"
    hyper = { version = "0.14", features = ["full"] }
    hyper-rustls = "0.22"
    rustls = "0.19"
    log = "0.4"
    tokio = { version = "1.6", features = ["full"] }
    url = "2.2"
    

    Code:

    use std::io::{stderr, stdout, Write};
    use std::path::{Path, PathBuf};
    use std::str::FromStr;
    
    use futures::executor::block_on;
    use hyper::body::HttpBody;
    use hyper::Client;
    use hyper_rustls::HttpsConnector;
    use log::{debug, error, LevelFilter, Log, Metadata, Record};
    use tokio::fs::File;
    use tokio::io::AsyncWriteExt;
    use url::Url;
    
    async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> Result<(), ()> {
        let mut uri = hyper::Uri::from_str(url.as_str()).unwrap();
    
        let mut total_read: usize = 0;
        let mut total_written: usize = 0;
        let mut localfile = File::create(localpath).await.unwrap();
    
        // Redirection Limit
        for _ in 0..10 {
            let https = HttpsConnector::with_native_roots();
            let client = Client::builder().build::<_, hyper::Body>(https);
            let mut resp = client.get(uri.clone()).await.unwrap();
    
            let status = resp.status();
            let header = resp.headers();
    
            debug!("Status: {}", status);
            for (key, value) in resp.headers() {
                debug!("HEADER {}: {}", key, value.to_str().unwrap());
            }
    
            if status.is_success() {
                // tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await.unwrap();
    
                let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
    
                while let Some(next) = resp.data().await {
                    let chunk = next.unwrap();
    
                    let num_bytes = chunk.len();
                    total_read = total_read + num_bytes;
                    debug!("Received {} bytes ({} total)", num_bytes, total_read);
    
                    // localfile.write_all(&chunk).await.unwrap();
                    let written = localfile.write(&chunk).await.unwrap();
                    total_written = total_written + written;
                    debug!("Written {} bytes ({} total)", written, total_written);
    
                    if total_read != total_written {
                        error!("Could not write all data!");
                    }
    
                    if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
                        return Ok(());
                    }
                }
    
                return Ok(());
            } else if status.is_redirection() {
                let location = header.get("location").unwrap().to_str().unwrap();
    
                uri = hyper::Uri::from_str(location).unwrap();
            } else {
                return Err(());
            }
        }
    
        return Err(());
    }
    
    
    struct Logger;
    
    impl Log for Logger {
        fn enabled(&self, _: &Metadata) -> bool {
            true
        }
    
        fn log(&self, record: &Record) {
            eprintln!("{}: {}", record.level().as_str().to_uppercase(), record.args());
            stdout().flush().unwrap();
            stderr().flush().unwrap();
        }
    
        fn flush(&self) {
            stdout().flush().unwrap();
            stderr().flush().unwrap();
        }
    }
    
    static LOGGER: Logger = Logger;
    
    #[tokio::main]
    async fn main() {
        log::set_logger(&LOGGER).map(move |()| log::set_max_level(LevelFilter::Debug)).unwrap();
    
        let url = Url::parse("https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz").unwrap();
        let localfile = PathBuf::from("/tmp/cmake-3.20.5.tar.gz");
    
        block_on(download_http(&url, &localfile)).unwrap();
        // download_http(&url, &localfile).await.unwrap();
    }
    

    Switching between block_on and not using it makes the difference.

    Now I can switch back to using write_all and remove my debugging code.