Search code examples
rustrust-tokio

Async BehaviorSubject for Rust


I need a data container for sharing mutable state in an asynchronous context similar to the BehaviorSubject in RxJs. More precisely, I need a struct BehaviorSubject<T> containing an item of type T such that:

  • We can clone the container and pass it to other threads/tasks (i.e. the container needs to implement Clone + Send + Sync (I need it as part of an axum app state).
  • We can turn the container into a stream for observing the underlying T. This stream starts by sending the current value and then sends all updates (intermediate events don't matter - I only need the latest item at any time).
  • We can get/set the item inside the container. On each mutation, the item gets mutated in all cloned containers, and all streams are notified.

I have an implementation (see below) which almost works as intended, but still has some bugs:

  • Slow receivers seem to slow down other receivers (at least in the beginning - I don't really understand what is going on here).
  • Cleanup does not work properly: When turning all but one clones of a BehaviorSubject into streams and dropping the last one, I would expect all streams to stop. But that does not seem to happen.

Any suggestions (implementation improvements / alternative approaches / other libraries) are highly appreciated!

use futures::Stream;
use std::sync::Arc;
use tokio::sync::{
    broadcast::{channel, error::RecvError, Sender},
    Mutex,
};

#[derive(Clone)]
pub struct BehaviorSubject<T> {
    value: Arc<Mutex<T>>,
    sender: Sender<T>,
}

impl<T: Clone> BehaviorSubject<T> {
    pub fn new(value: T) -> Self {
        let (sender, _) = channel(1);
        BehaviorSubject {
            value: Arc::new(Mutex::new(value)),
            sender,
        }
    }

    pub async fn get_value(&self) -> T {
        self.value.lock().await.clone()
    }

    pub async fn set_value(&self, new_value: T) {
        let mut cur_value = self.value.lock().await;
        *cur_value = new_value.clone();
        _ = self.sender.send(new_value);
    }

    pub fn into_stream(self) -> impl Stream<Item = T> {
        async_stream::stream! {
            let mut rx = self.sender.subscribe();
            yield self.value.lock().await.clone();
            loop {
                match rx.recv().await {
                    Ok(value) => yield value,
                    Err(RecvError::Lagged(_)) => (),
                    Err(RecvError::Closed) => break,
                }
            }
        }
    }
}

Solution

  • Thanks to Chayim Friedman's suggestions, I found a very slim (and hopefully bug free) implementation for BehaviorSubject<T> by wrapping tokio's watch channel:

    use futures::Stream;
    use std::sync::Arc;
    use tokio::sync::watch::{channel, Receiver, Sender};
    use tokio_stream::wrappers::WatchStream;
    
    #[derive(Clone)]
    pub struct BehaviorSubject<T> {
        sender: Arc<Sender<T>>,
        receiver: Receiver<T>,
    }
    
    impl<T: 'static + Clone + Send + Sync> BehaviorSubject<T> {
        pub fn new(value: T) -> Self {
            let (sender, receiver) = channel(value);
            Self {
                sender: Arc::new(sender),
                receiver,
            }
        }
    
        pub fn get_value(&self) -> T {
            self.receiver.borrow().clone()
        }
    
        pub fn set_value(&self, value: T) {
            _ = self.sender.send(value)
        }
    
        pub fn into_stream(self) -> impl Stream<Item = T> {
            WatchStream::new(self.receiver)
        }
    }
    

    Use case:

    Just in case anyone is interested in use cases: I plan to use this struct for sharing a common mutable resource through a web server. It is designed for being used in that way:

    use axum::{
        extract::{Path, State},
        http::StatusCode,
        response::{
            sse::{self, KeepAlive},
            IntoResponse, Sse,
        },
        routing::get,
        Json, Router,
    };
    use behavior_subject::BehaviorSubject;
    use futures_util::StreamExt;
    
    #[derive(Clone)]
    struct AppState {
        subject: BehaviorSubject<u32>,
    }
    
    #[tokio::main]
    async fn main() {
        let app = Router::new()
            .route("/get", get(value_getter))
            .route("/set/:value", get(value_setter))
            .route("/stream", get(stream_handler))
            .with_state(AppState {
                subject: BehaviorSubject::new(0),
            });
    
        axum::Server::bind(&"127.0.0.1:8080".parse().unwrap())
            .serve(app.into_make_service())
            .await
            .unwrap()
    }
    
    async fn value_getter(State(state): State<AppState>) -> impl IntoResponse {
        Json::from(state.subject.get_value())
    }
    
    async fn value_setter(Path(value): Path<u32>, State(state): State<AppState>) -> impl IntoResponse {
        state.subject.set_value(value);
        StatusCode::OK
    }
    
    async fn stream_handler(State(state): State<AppState>) -> impl IntoResponse {
        Sse::new(
            state
                .subject
                .into_stream()
                .map(|val| sse::Event::default().json_data(val)),
        )
        .keep_alive(KeepAlive::default())
        .into_response()
    }