I have a sse endpoint made in Axum I want to send an event every second with a snapshot of a table in my Postgres database. My current code for this:
pub async fn payment_sse_handler(
State(mut state): State<AppState>,
Extension(current_user): Extension<CurrentUser>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream =
stream::repeat_with( move || get_balance(&state.db_pool, current_user.user_id.clone()).await) // <-- Error here
.map(Ok)
.throttle(Duration::from_secs(1));
Sse::new(stream).keep_alive(KeepAlive::default())
}
pub async fn get_balance (db_pool: &Pool<Postgres>, user_id: String) -> Event {
let transaction_history =
sqlx::query_as!(
Transaction,
r#"select transaction_id, status, currency, amount, user_id, created_at, processing_fee
from transaction_history
WHERE user_id=$1
ORDER BY created_at
LIMIT 5"#,
user_id
)
.fetch_all(db_pool)
.await;
match transaction_history {
Ok(transaction_history) => {
match serde_json::to_string(&transaction_history) {
Ok(transaction_history_str) => {Event::default().data(transaction_history_str)},
Err(_) => {Event::default().data("Error")}
}
}
Err(_) => Event::default().data("Error"),
}
}
The problem is that this code gives me the error: "await
is only allowed inside async
functions and blocks
only allowed inside async
functions and blocks". I think the reason I get the error is because || creates a new closure that is not async, but when I try to make it async I just get new errors. Does anyone have an idea on how to fix this or am I doing something fundamentally wrong with this approach?
repeat_with()
creates a closure from a synchronous function, not an asynchronous function. You can do that with unfold()
:
pub async fn payment_sse_handler(
State(mut state): State<AppState>,
Extension(current_user): Extension<CurrentUser>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::unfold((), move |()| {
let user_id = current_user.user_id.clone();
let db_pool = state.db_pool.clone();
async move { Some((get_balance(&db_pool, user_id).await, ())) }
})
.map(Ok)
.throttle(Duration::from_secs(1));
Sse::new(stream).keep_alive(KeepAlive::default())
}
However, note that this will send responses with delay of 1 second, not send responses every 1 second. For example, if get_balance()
takes 0.5s to execute, this will send responses every 1.5s. If you don't want that, you can use tokio::time::interval()
and wrap it with IntervalStream
:
pub async fn payment_sse_handler(
State(mut state): State<AppState>,
Extension(current_user): Extension<CurrentUser>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
.then(move |_| {
let user_id = current_user.user_id.clone();
let db_pool = state.db_pool.clone();
async move { Ok(get_balance(&db_pool, user_id).await) }
});
Sse::new(stream).keep_alive(KeepAlive::default())
}