Search code examples
rustconnection-poolingrust-tokiohyper

Example usage of hyper with bb8 and postgres


I want to use hyper with bb8 and tokio-postgres. In every request I want to acquire a new connection from the pool. Can anybody provide me some example for this scenario? Currently I do it like this:

fn main() {
    let addr = "127.0.0.1:3000".parse().unwrap();

    let pg_mgr =
        PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);

    rt::run(future::lazy(move || {
        Pool::builder()
            .build(pg_mgr)
            .map_err(|e| eprintln!("Database error: {}", e))
            .and_then(move |pool| {
                let service = || service_fn(|req| router(req, pool.clone()));

                let server = Server::bind(&addr)
                    .serve(service)
                    .map_err(|e| eprintln!("Server error: {}", e));

                println!("Listening on http://{}", addr);
                server
            })
    }))
}

fn router(
    _req: Request<Body>,
    _pool: Pool<PostgresConnectionManager<NoTls>>,
) -> Result<Response<Body>, hyper::Error> {
    // do some staff with pool
}

But it won't compile:

error[E0597]: `pool` does not live long enough
  --> src/main.rs:22:63
   |
22 |                 let service = || service_fn(|req| router(req, pool.clone()));
   |                               -- -----------------------------^^^^----------
   |                               |  |                            |
   |                               |  |                            borrowed value does not live long enough
   |                               |  returning this value requires that `pool` is borrowed for `'static`
   |                               value captured here
...
30 |             })
   |             - `pool` dropped here while still borrowed

What am I doing wrong? How to make my case work correctly?


Solution

  • Solution found here

    The simplest solution looks like:

    fn main() {
        let addr = "127.0.0.1:3000".parse().unwrap();
    
        let pg_mgr =
            PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);
    
        rt::run(future::lazy(move || {
            Pool::builder()
                .build(pg_mgr)
                .map_err(|_| eprintln!("kek"))
                .and_then(move |pool| {
                    let service = move || {
                        let pool = pool.clone();
                        service_fn(move |req| router(req, &pool))
                    };
    
                    let server = Server::bind(&addr)
                        .serve(service)
                        .map_err(|e| eprintln!("Server error: {}", e));
    
                    println!("Listening on http://{}", addr);
                    server
                })
        }))
    }
    
    fn router(
        _req: Request<Body>,
        _pool: &Pool<PostgresConnectionManager<NoTls>>,
    ) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
        // some staff
    }
    

    It is also possible to construct service outside of rt::run with Arc and Mutex:

    fn main() {
        let addr = "127.0.0.1:3000".parse().unwrap();
    
        let pg_mgr =
            PostgresConnectionManager::new("postgresql://auth:auth@localhost:5433/auth", NoTls);
        let pool: Arc<Mutex<Option<Pool<PostgresConnectionManager<NoTls>>>>> =
            Arc::new(Mutex::new(None));
        let pool2 = pool.clone();
    
        let service = move || {
            let pool = pool.clone();
            service_fn(move |req| {
                let locked = pool.lock().unwrap();
                let pool = locked
                    .as_ref()
                    .expect("bb8 should be initialized before hyper");
                router(req, pool)
            })
        };
    
        rt::run(future::lazy(move || {
            Pool::builder()
                .build(pg_mgr)
                .map_err(|_| eprintln!("kek"))
                .and_then(move |pool| {
                    *pool2.lock().unwrap() = Some(pool);
    
                    let server = Server::bind(&addr)
                        .serve(service)
                        .map_err(|e| eprintln!("Server error: {}", e));
    
                    println!("Listening on http://{}", addr);
                    server
                })
        }))
    }
    
    fn router(
        _req: Request<Body>,
        _pool: &Pool<PostgresConnectionManager<NoTls>>,
    ) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
        // some staff
    }