Search code examples
rustrust-tokio

Closure is not a Future if a variable is marked 'static


This simple piece of code doesn't build if the address variable is marked 'static

use std::sync::Arc;
use warp::Filter;

type RpcHandler= Arc<dyn Fn() + Sync + Send + 'static>;

pub struct Server {
    handlers: RpcHandler,
}

impl Server {
    pub fn new() -> Self {
        Server {
            handlers: Arc::new(|| {}),
        }
    }

    pub async fn start(self: Arc<Self>, addr: & str) {
        let handlers = Arc::clone(&self.handlers);
        let handlers_filter = warp::any().map(move || Arc::clone(&handlers));

        let ws_route = warp::path("ws").and(warp::ws()).and(handlers_filter).map(
            move |ws: warp::ws::Ws, handlers: Arc<dyn Fn() + Sync + Send>| {
                ws.on_upgrade(move |socket| {
                    let handlers = Arc::clone(&handlers);
                    async move {
                        handlers();
                        drop(socket); // Use the socket in some way to avoid unused variable warning
                    }
                })
            },
        );

        let web_route = warp::path("hello").map(|| warp::reply::html("Hello, World!"));

        warp::serve(ws_route.or(web_route))
            .run(addr.parse::<std::net::SocketAddr>().unwrap())
            .await;
    }
}

impl Default for Server {
    fn default() -> Self {
        Self::new()
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let address: & 'static str = "127.0.0.1:3030";
    
    tokio::spawn(async {
        let server = Arc::new(Server::new());
        
        server.start(address).await;
    });

    Ok(())
}

The error reported is "the closure is not a Future". Full description:

 = note: {closure@src\main.rs:51:25: 51:27} must be a future or must implement `IntoFuture` to be awaited
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\index.crates.io-6f17d22bba15001f\tokio-1.38.0\src\task\spawn.rs:166:12
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |            ^^^^^^ required by this bound in `spawn`

The rest of the code is irrelevant. How does the lifetime of the variable affect the matching of a trait somewhere deep in tokio::spawn ?


Solution

  • I don't think the 'static annotation in address has nothing to do with that error, or at least my attempt to reproduce the issue didn't point to that.

    Instead, the error seems to arise from the magic inside warp to flatten the tuples that are sent to map.

    Maybe it is a bug in warp? Certainly it looks like it. If you make RpcHandler a new-type instead of an alias, it just works:

    #[derive(Clone)]
    struct RpcHandler(Arc<dyn Fn() + Sync + Send + 'static>);
    
    //...
        pub async fn start(self: Arc<Self>, addr: & str) {
            let handlers = self.handlers.clone();
            let handlers_filter = warp::any().map(move || handlers.clone());
    
            let ws_route = warp::path("ws").and(warp::ws()).and(handlers_filter).map(
                move |ws: warp::ws::Ws, handlers: RpcHandler| {
                    ws.on_upgrade(move |socket| {
                        let handlers = handlers.clone();
                        async move {
                            handlers.0();
                            drop(socket);
                        }
                    })
                },
            );
    //...
    
    

    Alternatively, the code as written doesn't actually need to inject the handlers as a filter. You can just capture the variable into the inner closure bypassing warp. And that will work with the type alias. Something like this:

        pub async fn start(self: Arc<Self>, addr: &str) {
            let ws_route = warp::path("ws").and(warp::ws()).map(
                // This moves self into the closure.
                // In the real code you may need an extra Arc::clone here.
                move |ws: warp::ws::Ws| {
                    ws.on_upgrade({
                        let handlers = Arc::clone(&self.handlers);
                        move |socket| {
                            async move {
                                handlers();
                                drop(socket);
                            }
                        }
                    })
                }
            );
    

    No warp magic, no problems!