I wrote the following code but I don't fully understand why I have to use 'static
lifetimes.
use tokio::sync::watch::Receiver;
pub async fn combine_streams<T, F>(
mut streams: Vec<Receiver<Option<T>>>,
combiner: F,
) -> Receiver<Option<T>>
where
// I think "static" here refers to the lifetime of the type and not instances of it
T: Clone + Send + Sync + Debug + 'static,
F: Fn(Vec<Option<T>>) -> Option<T> + Send + Sync + 'static,
{
let (tx, rx) = watch::channel(None);
info!("Waiting for initial values...");
while streams.iter_mut().any(|mut stream| stream.borrow_and_update().is_none()) {}
let mut current_values = streams
.iter()
.map(|stream| stream.borrow().clone())
.collect::<Vec<Option<T>>>();
info!("Received initial values: {:?}", current_values);
let combined_value = combiner(current_values.clone());
tx.send(combined_value).expect("Couldn't send output over channel");
tokio::spawn(async move {
loop {
// wait for any stream to change
let any_change =
streams
.iter_mut()
.map(|stream| stream.changed().boxed())
.collect::<Vec<BoxFuture<_>>>();
let (_, index, _) = select_all(any_change).await;
current_values[index] = streams[index].borrow().clone();
let new_value = combiner(current_values.clone());
tx.send(new_value).expect("Couldn't send output over channel");
}
});
rx
}
I don't want to pass in objects that live forever. They are meant to be read from the channel and discarded.
'static
here? does it refer to the lifetime of the type itself (as in, the concrete type for T or F, that will be used when calling)? Or does it refer to the lifetime of the particular instances passed in?A lifetime bound on a generic means that the generic type must be valid for at most that lifetime - but it may be dropped or disused before that lifetime elapses.
For T: 'static
that means that T
must be able to last as long as 'static
(i.e. until the death of the program), but that doesn't mean it necessarily will.
tokio::spawn
requires that the future passed to it, and all the data it contains, be able to last until the program is terminated, since, unless you call join
on its return value, the task may loop forever and never actually terminate. If it were possible for the task to be passed a temporary reference, it would mean it could access that reference after the value it was pointing to had been dropped.