Search code examples
rusttarrust-tokio

Async read_to_end results in empty buffer even though number of read bytes is > 0


I'm writing an async (tokio) program that transparently reads from regular files, gzipped files and gzipped tar archives. It works for the first two cases but I'm having trouble with reading from tar files using tokio-tar. What happens is that the number of bytes read is greater that 0 but the buffer stays empty anyways.

The actual program is quite big so here is some code that presents the problem in an isolated form.

#![feature(let_chains)]
use std::path::Path;

use async_compression::tokio::bufread::GzipDecoder;
use futures::StreamExt;
use tokio::{
    fs::File,
    io::{AsyncReadExt, BufReader},
};
use tokio_tar::{Archive, Entry};

#[derive(Debug)]
pub enum Handle {
    File(BufReader<File>),
    Gz(GzipDecoder<BufReader<File>>),
    TarGz(Entry<Archive<GzipDecoder<BufReader<File>>>>),
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // prepare the reader
    let f = File::open("./archive.tar.gz").await?;
    let gz = GzipDecoder::new(BufReader::new(f));
    let mut archive = Archive::new(gz);

    // the path of the file to read within the archive
    let path_wanted = Path::new("tar/archive.txt");
    let mut entries = archive.entries()?;
    let mut handle: Option<Handle> = None;

    // iterate through the files in the archive until the desired file is found
    while let Some(entry_result) = entries.next().await {
        if let Ok(entry) = entry_result
            && let Ok(path) = entry.path()
            && path == path_wanted
        {
            handle = Some(Handle::TarGz(entry));
        }
    }

    // use the assigned variable to read the archive entry's contents
    if let Some(mut tgz) = handle {
        match tgz {
            Handle::TarGz(ref mut tgz) => {
                let mut vec_buf = vec![0; 10];
                loop {
                    match tgz.take(10).read_to_end(&mut vec_buf).await {
                        // here the actual problem becomes visible
                        // while reading the entry, n is greater than 0 but nothing is written to the buffer
                        Ok(n) if n > 0 => {
                            println!("n: {} - {:?}", n, &vec_buf[..n]);
                        }
                        _ => break,
                    }
                }
            }
            _ => panic!("case not covered"),
        }
    } else {
        println!("path_wanted not found in archive");
    }
    Ok(())
}

The file tar/archive.txt inside the tar archive looks like:

127.0.0.1   test.com
127.0.0.1   test.de

The key point of the code snippet is that the Handle is assigned to a variable (actually a field of a struct in the real program) which is the used further down in the program.

The output looks like:

tarvester on  master [?] is 📦 v0.1.0 via 🦀 v1.66.0-nightly
❯ cargo run
   Compiling tarvester v0.1.0 (/home/bob/code/gardion/tarvester)
    Finished dev [unoptimized + debuginfo] target(s) in 0.54s
     Running `target/debug/tarvester`
n: 10 - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
n: 10 - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
n: 10 - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
n: 7 - [0, 0, 0, 0, 0, 0, 0]

As you can see it's apparently reading the correct number of bytes but the buffer still stays empty.

A modified version of the code (which is not practical for the architecture of my program) where the assigned handle is used right after assignment to the variable kind of works:

#![feature(let_chains)]
use std::path::Path;

use async_compression::tokio::bufread::GzipDecoder;
use futures::StreamExt;
use tokio::{
    fs::File,
    io::{AsyncReadExt, BufReader},
};
use tokio_tar::{Archive, Entry};

#[derive(Debug)]
pub enum Handle {
    File(BufReader<File>),
    Gz(GzipDecoder<BufReader<File>>),
    TarGz(Entry<Archive<GzipDecoder<BufReader<File>>>>),
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let f = File::open("../harvester/archive.tar.gz").await?;
    let gz = GzipDecoder::new(BufReader::new(f));
    let mut archive = Archive::new(gz);

    let path_wanted = Path::new("tar/archive.txt");
    let mut entries = archive.entries()?;
    let mut handle: Option<Handle> = None;
    while let Some(entry_result) = entries.next().await {
        if let Ok(entry) = entry_result
            && let Ok(path) = entry.path()
            && path == path_wanted
        {
            handle = Some(Handle::TarGz(entry));
            // !! the handle assigned to the variable is used right away without leaving the block
            match handle.as_mut().unwrap() {
                Handle::TarGz(tgz) => {
                    let mut vec_buf = vec![0; 10];
                    loop {
                        match tgz.take(10).read_to_end(&mut vec_buf).await {
                            // some messed up output shows up here
                            Ok(n) if n > 0 => {
                                println!("{}", String::from_utf8_lossy(&vec_buf[..]));
                            },
                            _ => break
                        }
                    }
                },
                _ => panic!("case not covered"),
            }
        }
    }
    Ok(())
}

The latter code produces output which looks a bit messed up though:

127.0.0.1
127.0.0.1       test.com
1
127.0.0.1       test.com
127.0.0.1       t
127.0.0.1       test.com
127.0.0.1       test.de

Can somebody spot what I'm doing wrong here? Or is this maybe a bug in tokio-tar? If it's more likely a bug in the dependency, I'd also appreciate suggestions on how to go on with the debugging even if I need to dive further into tokio-tar.


Solution

  • A working example:

    #![feature(let_chains)]
    use std::path::Path;
    
    use async_compression::tokio::bufread::GzipDecoder;
    use futures::StreamExt;
    use tokio::{
        fs::File,
        io::{AsyncReadExt, BufReader},
    };
    use tokio_tar::{Archive, Entry};
    
    #[derive(Debug)]
    pub enum Handle {
        File(BufReader<File>),
        Gz(GzipDecoder<BufReader<File>>),
        TarGz(Entry<Archive<GzipDecoder<BufReader<File>>>>),
    }
    
    #[tokio::main]
    async fn main() -> anyhow::Result<()> {
        // prepare the reader
        let f = File::open("./archive.tar.gz").await?;
        let gz = GzipDecoder::new(BufReader::new(f));
        let mut archive = Archive::new(gz);
    
        // the path of the file to read within the archive
        let path_wanted = Path::new("tar/archive.txt");
        let mut entries = archive.entries()?;
        let mut handle: Option<Handle> = None;
    
        // iterate through the files in the archive until the desired file is found
        while let Some(entry_result) = entries.next().await {
            if let Ok(entry) = entry_result
                && let Ok(path) = entry.path()
                && path == path_wanted
            {
                handle = Some(Handle::TarGz(entry));
                break;
            }
        }
    
        // use the assigned variable to read the archive entry's contents
        if let Some(mut tgz) = handle {
            match tgz {
                Handle::TarGz(ref mut tgz) => {
                    let mut vec_buf = Vec::with_capacity(10);
                    loop {
                        match tgz.take(10).read_to_end(&mut vec_buf).await {
                            // here the actual problem becomes visible
                            // while reading the entry, n is greater than 0 but nothing is written to the buffer
                            Ok(n) if n > 0 => {
                                println!("n: {} - {:?}", n, &vec_buf[..n]);
                            }
                            _ => break,
                        }
                    }
                }
                _ => panic!("case not covered"),
            }
        } else {
            println!("path_wanted not found in archive");
        }
        Ok(())
    }
    

    There is two problem in your code.

    First, the above loop might not do what you expect because the entry_result variable is actually a single variable that takes on the value of each item. When the stream terminates, the handle inner value actually point to the last item in stream. That's why I add a break there.

    Second, read_to_end will actually append the data to vec_buf instead of starting at 0.

    All bytes read from this source will be appended to the specified buffer buf. This function will continuously call read() to append more data to buf until read() returns Ok(0).

    Or you may consider to use filter:

    #![feature(let_chains)]
    use std::path::Path;
    
    use async_compression::tokio::bufread::GzipDecoder;
    use futures::{future, StreamExt};
    use tokio::{
        fs::File,
        io::{AsyncReadExt, BufReader},
    };
    use tokio_tar::{Archive, Entry};
    
    #[derive(Debug)]
    pub enum Handle {
        File(BufReader<File>),
        Gz(GzipDecoder<BufReader<File>>),
        TarGz(Entry<Archive<GzipDecoder<BufReader<File>>>>),
    }
    
    #[tokio::main]
    async fn main() -> anyhow::Result<()> {
        // prepare the reader
        let f = File::open("./archive.tar.gz").await?;
        let gz = GzipDecoder::new(BufReader::new(f));
        let mut archive = Archive::new(gz);
    
        // the path of the file to read within the archive
        let path_wanted = Path::new("tar/archive.txt");
        let entries = archive.entries()?;
    
        // iterate through the files in the archive until the desired file is found
        let handle = entries
            .filter(|entry_result| {
                future::ready(if let Ok(entry) = entry_result
                && let Ok(path) = entry.path()
                && path == path_wanted {
                    true
                } else {
                    false
                })
            })
            .next()
            .await
            .map(|entry| Handle::TarGz(entry.unwrap()));
    
        // use the assigned variable to read the archive entry's contents
        if let Some(mut tgz) = handle {
            match tgz {
                Handle::TarGz(ref mut tgz) => {
                    let mut vec_buf = Vec::with_capacity(10);
                    loop {
                        match tgz.take(10).read_to_end(&mut vec_buf).await {
                            // here the actual problem becomes visible
                            // while reading the entry, n is greater than 0 but nothing is written to the buffer
                            Ok(n) if n > 0 => {
                                println!("n: {} - {:?}", n, &vec_buf[..n]);
                            }
                            _ => break,
                        }
                    }
                }
                _ => panic!("case not covered"),
            }
        } else {
            println!("path_wanted not found in archive");
        }
        Ok(())
    }