Search code examples
rustlibcurl

How to read buffered lines directly from curl in Rust


I'm using the curl crate to download a gzipped file:

use curl::easy::Easy;

fn download_file() -> MyResult<()> {
    let mut file = File::create("file.txt.gz")?;
    let mut easy = Easy::new();
    easy.url("https://example.com/file.txt.gz")?;
    easy.write_function(move |data| {
        file.write_all(data).unwrap();
        return Ok(data.len());
    })?;
    easy.perform()?;
    return Ok(());
}

I'm using the flate2 crate to decompress the file, and read one line at a time:

use flate2::read::GzDecoder;

fn import_file() -> MyResult<()> {
    let file = File::open("file.txt.gz")?;
    let reader = BufReader::new(GzDecoder::new(file));
    for line in reader.lines() {
        let line = line?;
        println!("{line}");
    }
    return Ok(());
}

I would like to combine these steps, but GzDecoder requires an object implementing the Read trait, and I'm not sure how to hook that up to Easy. I suspect this will require doing the download and import steps on separate threads:

use curl::easy::Easy;
use flate2::read::GzDecoder;

fn download_and_import_file() -> MyResult<()> {
    let handle = thread::spawn(|| {
        let mut easy = Easy::new();
        easy.url("https://example.com/file.txt.gz").unwrap();
        easy.write_function(move |data| {
            // (what goes here?)
            return Ok(data.len());
        }).unwrap();
        easy.perform().unwrap();
    });
    let reader = // (what goes here?)
    let reader = BufReader::new(GzDecoder::new(reader));
    for line in reader.lines() {
        let line = line?;
        println!("{line}");
    }
    handle.join().unwrap();
    return Ok(());
}

Solution

  • You can create a type that serves as an in-memory Read implementation which you can write into:

    use std::collections::VecDeque;
    use std::io::{self, Read};
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::{Arc, Condvar, Mutex};
    
    #[derive(Clone, Default)]
    pub struct InMemoryStream {
        inner: Arc<InMemoryStreamInner>,
    }
    
    #[derive(Default)]
    struct InMemoryStreamInner {
        data: Mutex<VecDeque<u8>>,
        condvar: Condvar,
        closed: AtomicBool,
    }
    
    impl InMemoryStream {
        pub fn write(&self, data: &[u8]) {
            self.inner.data.lock().unwrap().extend(data);
            self.inner.condvar.notify_one();
        }
    
        pub fn close(&self, data: &[u8]) {
            self.inner.closed.store(true, Ordering::SeqCst);
            self.inner.condvar.notify_all();
        }
    }
    
    impl Read for InMemoryStream {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            let mut lock = self.inner.data.lock().unwrap();
            let read = lock.read(buf)?;
            if read > 0 {
                return Ok(read);
            }
            if self.inner.closed.load(Ordering::SeqCst) {
                return Ok(0);
            }
    
            let mut lock = self.inner.condvar.wait(lock).unwrap();
            lock.read(buf)
        }
    }
    

    Then use it as follows:

    fn download_and_import_file() -> MyResult<()> {
        let stream = InMemoryStream::default();
        let handle = thread::spawn({
            let stream = stream.clone();
            let mut stream2 = stream.clone();
            || {
                let mut easy = Easy::new();
                easy.url("https://example.com/file.txt.gz").unwrap();
                easy.write_function(move |data| {
                    stream.write(data);
                    return Ok(data.len());
                })
                .unwrap();
                easy.perform().unwrap();
                stream2.close();
            }
        });
        let reader = BufReader::new(GzDecoder::new(stream));
        for line in reader.lines() {
            let line = line?;
            println!("{line}");
        }
        handle.join().unwrap();
        return Ok(());
    }