I'm using the curl
crate to download a gzipped file:
use curl::easy::Easy;
fn download_file() -> MyResult<()> {
let mut file = File::create("file.txt.gz")?;
let mut easy = Easy::new();
easy.url("https://example.com/file.txt.gz")?;
easy.write_function(move |data| {
file.write_all(data).unwrap();
return Ok(data.len());
})?;
easy.perform()?;
return Ok(());
}
I'm using the flate2
crate to decompress the file, and read one line at a time:
use flate2::read::GzDecoder;
fn import_file() -> MyResult<()> {
let file = File::open("file.txt.gz")?;
let reader = BufReader::new(GzDecoder::new(file));
for line in reader.lines() {
let line = line?;
println!("{line}");
}
return Ok(());
}
I would like to combine these steps, but GzDecoder
requires an object implementing the Read
trait, and I'm not sure how to hook that up to Easy
. I suspect this will require doing the download and import steps on separate threads:
use curl::easy::Easy;
use flate2::read::GzDecoder;
fn download_and_import_file() -> MyResult<()> {
let handle = thread::spawn(|| {
let mut easy = Easy::new();
easy.url("https://example.com/file.txt.gz").unwrap();
easy.write_function(move |data| {
// (what goes here?)
return Ok(data.len());
}).unwrap();
easy.perform().unwrap();
});
let reader = // (what goes here?)
let reader = BufReader::new(GzDecoder::new(reader));
for line in reader.lines() {
let line = line?;
println!("{line}");
}
handle.join().unwrap();
return Ok(());
}
You can create a type that serves as an in-memory Read
implementation which you can write into:
use std::collections::VecDeque;
use std::io::{self, Read};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
#[derive(Clone, Default)]
pub struct InMemoryStream {
inner: Arc<InMemoryStreamInner>,
}
#[derive(Default)]
struct InMemoryStreamInner {
data: Mutex<VecDeque<u8>>,
condvar: Condvar,
closed: AtomicBool,
}
impl InMemoryStream {
pub fn write(&self, data: &[u8]) {
self.inner.data.lock().unwrap().extend(data);
self.inner.condvar.notify_one();
}
pub fn close(&self, data: &[u8]) {
self.inner.closed.store(true, Ordering::SeqCst);
self.inner.condvar.notify_all();
}
}
impl Read for InMemoryStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut lock = self.inner.data.lock().unwrap();
let read = lock.read(buf)?;
if read > 0 {
return Ok(read);
}
if self.inner.closed.load(Ordering::SeqCst) {
return Ok(0);
}
let mut lock = self.inner.condvar.wait(lock).unwrap();
lock.read(buf)
}
}
Then use it as follows:
fn download_and_import_file() -> MyResult<()> {
let stream = InMemoryStream::default();
let handle = thread::spawn({
let stream = stream.clone();
let mut stream2 = stream.clone();
|| {
let mut easy = Easy::new();
easy.url("https://example.com/file.txt.gz").unwrap();
easy.write_function(move |data| {
stream.write(data);
return Ok(data.len());
})
.unwrap();
easy.perform().unwrap();
stream2.close();
}
});
let reader = BufReader::new(GzDecoder::new(stream));
for line in reader.lines() {
let line = line?;
println!("{line}");
}
handle.join().unwrap();
return Ok(());
}