I'm using futures, tokio, hyper, and serde_json to request and deserialize some data that I need to hold until my next request. My initial thought was to make a struct containing the hyper::Chunk
and the deserialized data that borrows from the Chunk
, but couldn't get the lifetimes right. I tried using the rental crate, but I can't get this to work either. Perhaps I'm using the 'buffer
lifetime before declaring the buffer Vec
, but maybe I've messed something else up:
#[rental]
pub struct ChunkJson<T: serde::de::Deserialize<'buffer>> {
buffer: Vec<u8>,
json: T
}
Is there some way to make the lifetimes right or should I just use DeserializeOwned
and give up on zero-copy?
For more context, the following code works (periodically deserializing JSON from two URLs, retaining the results so we can do something with them both). I'd like to change my X
and Y
types to use Cow<'a, str>
for their fields, changing from DeserializeOwned
to Deserialize<'a>
. For this to work, I need to store the slice that has been deserialized for each, but I don't know how to do this. I'm looking for examples that use Serde's zero-copy deserialization and retain the result, or some idea for restructuring my code that would work.
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate tokio_core;
extern crate tokio_periodic;
extern crate hyper;
use std::collections::HashMap;
use std::error::Error;
use futures::future;
use futures::Future;
use futures::stream::Stream;
use hyper::Client;
fn stream_json<'a, T: serde::de::DeserializeOwned + Send + 'a>
(handle: &tokio_core::reactor::Handle,
url: String,
period: u64)
-> Box<Stream<Item = T, Error = Box<Error>> + 'a> {
let client = Client::new(handle);
let timer = tokio_periodic::PeriodicTimer::new(handle).unwrap();
timer
.reset(::std::time::Duration::new(period, 0))
.unwrap();
Box::new(futures::Stream::zip(timer.from_err::<Box<Error>>(), futures::stream::unfold( (), move |_| {
let uri = url.parse::<hyper::Uri>().unwrap();
let get = client.get(uri).from_err::<Box<Error>>().and_then(|res| {
res.body().concat().from_err::<Box<Error>>().and_then(|chunks| {
let p: Result<T, Box<Error>> = serde_json::from_slice::<T>(chunks.as_ref()).map_err(|e| Box::new(e) as Box<Error>);
match p {
Ok(json) => future::ok((json, ())),
Err(err) => future::err(err)
}
})
});
Some(get)
})).map(|x| { x.1 }))
}
#[derive(Serialize, Deserialize, Debug)]
pub struct X {
foo: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Y {
bar: String,
}
fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let x_stream = stream_json::<HashMap<String, X>>(&handle, "http://localhost/X".to_string(), 2);
let y_stream = stream_json::<HashMap<String, Y>>(&handle, "http://localhost/Y".to_string(), 5);
let mut xy_stream = x_stream.merge(y_stream);
let mut last_x = HashMap::new();
let mut last_y = HashMap::new();
loop {
match core.run(futures::Stream::into_future(xy_stream)) {
Ok((Some(item), stream)) => {
match item {
futures::stream::MergedItem::First(x) => last_x = x,
futures::stream::MergedItem::Second(y) => last_y = y,
futures::stream::MergedItem::Both(x, y) => {
last_x = x;
last_y = y;
}
}
println!("\nx = {:?}", &last_x);
println!("y = {:?}", &last_y);
// Do more stuff with &last_x and &last_y
xy_stream = stream;
}
Ok((None, stream)) => xy_stream = stream,
Err(_) => {
panic!("error");
}
}
}
}
When trying to solve a complicated programming problem, it's very useful to remove as much as you can. Take your code and remove what you can until the problem goes away. Tweak your code a bit and keep removing until you can't any more. Then, turn the problem around and build from the smallest piece and work back to the error. Doing both of these will show you where the problem lies.
First, let's make sure we deserialize correctly:
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
use std::borrow::Cow;
#[derive(Debug, Deserialize)]
pub struct Example<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
key: bool,
}
impl<'a> Example<'a> {
fn info(&self) {
println!("{:?}", self);
match self.name {
Cow::Borrowed(_) => println!("Is borrowed"),
Cow::Owned(_) => println!("Is owned"),
}
}
}
fn main() {
let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();
let decoded: Example = serde_json::from_slice(&data).expect("Couldn't deserialize");
decoded.info();
}
Here, I forgot to add the #[serde(borrow)]
attribute, so I'm glad I did this test!
Next, we can introduce the rental crate:
#[macro_use]
extern crate rental;
rental! {
mod holding {
use super::*;
#[rental]
pub struct VecHolder {
data: Vec<u8>,
parsed: Example<'data>,
}
}
}
fn main() {
let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();
let holder = holding::VecHolder::try_new(data, |data| {
serde_json::from_slice(data)
});
let holder = match holder {
Ok(holder) => holder,
Err(_) => panic!("Unable to construct rental"),
};
holder.rent(|example| example.info());
// Make sure we can move the data and it's still valid
let holder2 = { holder };
holder2.rent(|example| example.info());
}
Next we try to create a rental of Chunk
:
#[rental]
pub struct ChunkHolder {
data: Chunk,
parsed: Example<'data>,
}
Unfortunately, this fails:
--> src/main.rs:29:1
|
29 | rental! {
| ^
|
= help: message: Field `data` must have an angle-bracketed type parameter or be `String`.
Oops! Checking the docs for rental, we can add #[target_ty_hack="[u8]"]
to the data
field. This leads to:
error[E0277]: the trait bound `hyper::Chunk: rental::__rental_prelude::StableDeref` is not satisfied
--> src/main.rs:29:1
|
29 | rental! {
| ^ the trait `rental::__rental_prelude::StableDeref` is not implemented for `hyper::Chunk`
|
= note: required by `rental::__rental_prelude::static_assert_stable_deref`
That's annoying; since we can't implement that trait for Chunk
, we just need to box Chunk
, proving that it has a stable address:
#[rental]
pub struct ChunkHolder {
data: Box<Chunk>,
parsed: Example<'data>,
}
I also looked to see if there is a way to get a Vec<u8>
back out of Chunk
, but it doesn't appear to exist. That would have been another solution with less allocation and indirection.
At this point, "all" that's left is to integrate this back into the futures code. It's a lot of work for anyone but you to recreate that, but I don't foresee any obvious problems in doing so.