Search code examples
rustasync-awaitrust-axumrust-futures

Axum sse endpoint, run asynchronous function every second with futures util stream


I have a sse endpoint made in Axum I want to send an event every second with a snapshot of a table in my Postgres database. My current code for this:

pub async fn payment_sse_handler(
    State(mut state): State<AppState>,
    Extension(current_user): Extension<CurrentUser>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = 
        stream::repeat_with( move || get_balance(&state.db_pool, current_user.user_id.clone()).await) // <-- Error here
        .map(Ok)
        .throttle(Duration::from_secs(1));

    Sse::new(stream).keep_alive(KeepAlive::default())
}

pub async fn get_balance (db_pool: &Pool<Postgres>, user_id: String) -> Event {
    let transaction_history = 
        sqlx::query_as!(
            Transaction, 
            r#"select transaction_id, status, currency, amount, user_id, created_at, processing_fee 
            from transaction_history 
            WHERE user_id=$1 
            ORDER BY created_at 
            LIMIT 5"#,
            user_id
        )
        .fetch_all(db_pool)
        .await;

    match transaction_history {
        Ok(transaction_history) => {
            match serde_json::to_string(&transaction_history) {
                Ok(transaction_history_str) => {Event::default().data(transaction_history_str)},
                Err(_) => {Event::default().data("Error")}
            }
        }
        Err(_) => Event::default().data("Error"),
    }
}

The problem is that this code gives me the error: "await is only allowed inside async functions and blocks only allowed inside async functions and blocks". I think the reason I get the error is because || creates a new closure that is not async, but when I try to make it async I just get new errors. Does anyone have an idea on how to fix this or am I doing something fundamentally wrong with this approach?


Solution

  • repeat_with() creates a closure from a synchronous function, not an asynchronous function. You can do that with unfold():

    pub async fn payment_sse_handler(
        State(mut state): State<AppState>,
        Extension(current_user): Extension<CurrentUser>,
    ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
        let stream = stream::unfold((), move |()| {
            let user_id = current_user.user_id.clone();
            let db_pool = state.db_pool.clone();
            async move { Some((get_balance(&db_pool, user_id).await, ())) }
        })
        .map(Ok)
        .throttle(Duration::from_secs(1));
    
        Sse::new(stream).keep_alive(KeepAlive::default())
    }
    

    However, note that this will send responses with delay of 1 second, not send responses every 1 second. For example, if get_balance() takes 0.5s to execute, this will send responses every 1.5s. If you don't want that, you can use tokio::time::interval() and wrap it with IntervalStream:

    pub async fn payment_sse_handler(
        State(mut state): State<AppState>,
        Extension(current_user): Extension<CurrentUser>,
    ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
        let stream =
            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
                .then(move |_| {
                    let user_id = current_user.user_id.clone();
                    let db_pool = state.db_pool.clone();
                    async move { Ok(get_balance(&db_pool, user_id).await) }
                });
    
        Sse::new(stream).keep_alive(KeepAlive::default())
    }