I need a data container for sharing mutable state in an asynchronous context similar to the BehaviorSubject in RxJs. More precisely, I need a struct BehaviorSubject<T>
containing an item of type T
such that:
Clone + Send + Sync
(I need it as part of an axum app state).T
. This stream starts by sending the current value and then sends all updates (intermediate events don't matter - I only need the latest item at any time).I have an implementation (see below) which almost works as intended, but still has some bugs:
BehaviorSubject
into streams and dropping the last one, I would expect all streams to stop. But that does not seem to happen.Any suggestions (implementation improvements / alternative approaches / other libraries) are highly appreciated!
use futures::Stream;
use std::sync::Arc;
use tokio::sync::{
broadcast::{channel, error::RecvError, Sender},
Mutex,
};
#[derive(Clone)]
pub struct BehaviorSubject<T> {
value: Arc<Mutex<T>>,
sender: Sender<T>,
}
impl<T: Clone> BehaviorSubject<T> {
pub fn new(value: T) -> Self {
let (sender, _) = channel(1);
BehaviorSubject {
value: Arc::new(Mutex::new(value)),
sender,
}
}
pub async fn get_value(&self) -> T {
self.value.lock().await.clone()
}
pub async fn set_value(&self, new_value: T) {
let mut cur_value = self.value.lock().await;
*cur_value = new_value.clone();
_ = self.sender.send(new_value);
}
pub fn into_stream(self) -> impl Stream<Item = T> {
async_stream::stream! {
let mut rx = self.sender.subscribe();
yield self.value.lock().await.clone();
loop {
match rx.recv().await {
Ok(value) => yield value,
Err(RecvError::Lagged(_)) => (),
Err(RecvError::Closed) => break,
}
}
}
}
}
Thanks to Chayim Friedman's suggestions, I found a very slim (and hopefully bug free) implementation for BehaviorSubject<T>
by wrapping tokio's watch
channel:
use futures::Stream;
use std::sync::Arc;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio_stream::wrappers::WatchStream;
#[derive(Clone)]
pub struct BehaviorSubject<T> {
sender: Arc<Sender<T>>,
receiver: Receiver<T>,
}
impl<T: 'static + Clone + Send + Sync> BehaviorSubject<T> {
pub fn new(value: T) -> Self {
let (sender, receiver) = channel(value);
Self {
sender: Arc::new(sender),
receiver,
}
}
pub fn get_value(&self) -> T {
self.receiver.borrow().clone()
}
pub fn set_value(&self, value: T) {
_ = self.sender.send(value)
}
pub fn into_stream(self) -> impl Stream<Item = T> {
WatchStream::new(self.receiver)
}
}
Use case:
Just in case anyone is interested in use cases: I plan to use this struct for sharing a common mutable resource through a web server. It is designed for being used in that way:
use axum::{
extract::{Path, State},
http::StatusCode,
response::{
sse::{self, KeepAlive},
IntoResponse, Sse,
},
routing::get,
Json, Router,
};
use behavior_subject::BehaviorSubject;
use futures_util::StreamExt;
#[derive(Clone)]
struct AppState {
subject: BehaviorSubject<u32>,
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/get", get(value_getter))
.route("/set/:value", get(value_setter))
.route("/stream", get(stream_handler))
.with_state(AppState {
subject: BehaviorSubject::new(0),
});
axum::Server::bind(&"127.0.0.1:8080".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap()
}
async fn value_getter(State(state): State<AppState>) -> impl IntoResponse {
Json::from(state.subject.get_value())
}
async fn value_setter(Path(value): Path<u32>, State(state): State<AppState>) -> impl IntoResponse {
state.subject.set_value(value);
StatusCode::OK
}
async fn stream_handler(State(state): State<AppState>) -> impl IntoResponse {
Sse::new(
state
.subject
.into_stream()
.map(|val| sse::Event::default().json_data(val)),
)
.keep_alive(KeepAlive::default())
.into_response()
}