Search code examples
asynchronousruststreamactix-web

Streaming results using Rust actix-web, lifetime issue


I am having issue to use Stream with actix-web using bellow code:

fn format_csv_row(row: tiberius::Row) -> Result<web::Bytes, ServerError> { ... }

#[get("/stream/")]
async fn get_stream(
    db_pool: web::Data<bb8::Pool<TiberiusConnectionManager>>,
) -> Result<HttpResponse, ServerError> {
    // Get connection
    let mut conn = db_pool
        .get()
        .await
        .map_err(|e| ServerError::PoolUnavailable)?;

    // Execute query
    let stream = conn
        .query("SELECT * FROM table", &[])
        .await
        .map_err(|e| ServerError::QueryFail)?;

    // Build a stream from SQL results
    let stream = stream.map(|x| format_csv_row(x.unwrap()));
    Ok(HttpResponse::Ok().streaming(stream))
}

Here are the methods I call in this function:

The compiler complains with the following errors:

error[E0597]: `db_pool` does not live long enough
   --> src/routes/trades_stream.rs:88:20
    |
88  |       let mut conn = db_pool
    |                      -^^^^^^
    |                      |
    |  ____________________borrowed value does not live long enough
    | |
89  | |         .get()
    | |______________- argument requires that `db_pool` is borrowed for `'static`
...
102 |   }
    |   - `db_pool` dropped here while still borrowed

error[E0597]: `conn` does not live long enough
   --> src/routes/trades_stream.rs:94:18
    |
94  |       let stream = conn
    |                    -^^^
    |                    |
    |  __________________borrowed value does not live long enough
    | |
95  | |         .query("SELECT * FROM table", &[])
    | |__________________________________________- argument requires that `conn` is borrowed for `'static`
...
102 |   }
    |   - `conn` dropped here while still borrowed

error: aborting due to 2 previous errors

I understand that connection and pool are dropped while streaming is still in progress.

How can I modify my code to make this works ? Is it possible to add explicit lifetime for db_pool and conn to make them match stream ?


Solution

  • To make this work, we need to change the code such that the stream has ownership of the connection it is reading from, and due to how bb8 is written, you also need ownership of a handle to the pool. The best way to do this is to use the async-stream crate.

    I believe something like this should do it:

    use async_stream::try_stream;
    
    fn format_csv_row(row: tiberius::Row) -> Result<web::Bytes, ServerError> { ... }
    
    #[get("/stream/")]
    async fn get_stream(
        db_pool: web::Data<bb8::Pool<TiberiusConnectionManager>>,
    ) -> Result<HttpResponse, ServerError> {
        // Cloning a bb8 pool gives a new handle to the same pool.
        let db_pool = db_pool.clone();
    
        let stream = try_stream! {
            // Get connection
            let mut conn = db_pool
                .get()
                .await
                .map_err(|e| ServerError::PoolUnavailable)?;
    
            // Execute query
            let stream = conn
                .query("SELECT * FROM table", &[])
                .await
                .map_err(|e| ServerError::QueryFail)?;
    
            while let Some(row) = stream.next().await {
                yield format_csv_row(row?)?;
            }
        };
    
        Ok(HttpResponse::Ok().streaming(Box::pin(stream)))
    }
    

    You might need another map_err on the row? part.

    Is it possible to add explicit lifetime for db_pool and conn to make them match stream ?

    No, you don't change how long things live by setting lifetimes. Instead, you change the lifetimes by changing the structure of the code in a way such that it lives long enough.