I am having issue to use Stream
with actix-web using bellow code:
fn format_csv_row(row: tiberius::Row) -> Result<web::Bytes, ServerError> { ... }
#[get("/stream/")]
async fn get_stream(
db_pool: web::Data<bb8::Pool<TiberiusConnectionManager>>,
) -> Result<HttpResponse, ServerError> {
// Get connection
let mut conn = db_pool
.get()
.await
.map_err(|e| ServerError::PoolUnavailable)?;
// Execute query
let stream = conn
.query("SELECT * FROM table", &[])
.await
.map_err(|e| ServerError::QueryFail)?;
// Build a stream from SQL results
let stream = stream.map(|x| format_csv_row(x.unwrap()));
Ok(HttpResponse::Ok().streaming(stream))
}
Here are the methods I call in this function:
The compiler complains with the following errors:
error[E0597]: `db_pool` does not live long enough
--> src/routes/trades_stream.rs:88:20
|
88 | let mut conn = db_pool
| -^^^^^^
| |
| ____________________borrowed value does not live long enough
| |
89 | | .get()
| |______________- argument requires that `db_pool` is borrowed for `'static`
...
102 | }
| - `db_pool` dropped here while still borrowed
error[E0597]: `conn` does not live long enough
--> src/routes/trades_stream.rs:94:18
|
94 | let stream = conn
| -^^^
| |
| __________________borrowed value does not live long enough
| |
95 | | .query("SELECT * FROM table", &[])
| |__________________________________________- argument requires that `conn` is borrowed for `'static`
...
102 | }
| - `conn` dropped here while still borrowed
error: aborting due to 2 previous errors
I understand that connection and pool are dropped while streaming is still in progress.
How can I modify my code to make this works ?
Is it possible to add explicit lifetime for db_pool
and conn
to make them match stream
?
To make this work, we need to change the code such that the stream has ownership of the connection it is reading from, and due to how bb8 is written, you also need ownership of a handle to the pool. The best way to do this is to use the async-stream
crate.
I believe something like this should do it:
use async_stream::try_stream;
fn format_csv_row(row: tiberius::Row) -> Result<web::Bytes, ServerError> { ... }
#[get("/stream/")]
async fn get_stream(
db_pool: web::Data<bb8::Pool<TiberiusConnectionManager>>,
) -> Result<HttpResponse, ServerError> {
// Cloning a bb8 pool gives a new handle to the same pool.
let db_pool = db_pool.clone();
let stream = try_stream! {
// Get connection
let mut conn = db_pool
.get()
.await
.map_err(|e| ServerError::PoolUnavailable)?;
// Execute query
let stream = conn
.query("SELECT * FROM table", &[])
.await
.map_err(|e| ServerError::QueryFail)?;
while let Some(row) = stream.next().await {
yield format_csv_row(row?)?;
}
};
Ok(HttpResponse::Ok().streaming(Box::pin(stream)))
}
You might need another map_err
on the row?
part.
Is it possible to add explicit lifetime for db_pool and conn to make them match stream ?
No, you don't change how long things live by setting lifetimes. Instead, you change the lifetimes by changing the structure of the code in a way such that it lives long enough.