Search code examples
rustlifetime

Passing a variable into a Rust FnMut without having it turn into a FnOnce


I am trying to make a change to the Rust Tonic UDS gRPC client example. It contains the following code block (see link):

let channel = Endpoint::try_from("http://[::]:50051")?
    .connect_with_connector(service_fn(|_: Uri| async {
        let path = "/tmp/tonic/helloworld";

         // Connect to a Uds socket
        Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
    }))
    .await?;

The change is super simple: I just want to be able to pass the path in from the outside rather than setting it to a fixed value. The problem is that it either complains that the function inside service_fn outlives any path object defined outside of it. Or, if I use move to pass it in, it complains that service_fn expects a FnMut, but now we're giving it a FnOnce.

Both errors make sense to me: The method can be called from a separate thread, long after we're past the lifetime of any path object defined outside of it. And, if we move it into the function, that move can only be done once, so the function can't be called multiple times any more as it might need to be for servicing multiple requests.

I found an answer here on SO that seems to do exactly what I need, but I just can't get it to work. I'm sure I'm missing something obvious, but I just can't figure out what it is.


Solution

  • The simplest approach is to clone everything. When everything is an owned value you don't have to deal with lifetimes whatsoever.

        // for demonstrating the more general case, I'm using a non-Copy type here
        let path = PathBuf::from("your/path/here");
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(move |_: Uri| {
                let path = path.clone();
                async move {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
                }
            }))
            .await?;
    

    This may be appropriate if your code isn't performance sensitive or if the data is small. If you want to avoid cloning, you have to do a bit more work:

        let path = PathBuf::from("your/path/here");
        let path_shared = Arc::new(path);
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(move |_: Uri| {
                let path_shared = Arc::clone(&path_shared);
                async move {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(
                        UnixStream::connect(path_shared.as_ref()).await?,
                    ))
                }
            }))
            .await?;
    

    Let's see how we got here. We'll start with a naive implementation, and let the compiler guide us.


    This is what we want, written as simply as possible:

        let path = PathBuf::from("your/path/here");
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(|_: Uri| {
                async {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
                }
            }))
    

    And this doesn't compile:

    error[E0525]: expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
      --> examples/src/uds/client.rs:26:44
       |
    26 |         .connect_with_connector(service_fn(|_: Uri| {
       |          ----------------------            ^^^^^^^^ this closure implements `FnOnce`, not `FnMut`
       |          |
       |          the requirement to implement `FnMut` derives from here
    ...
    29 |                 Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
       |                                                                          ---- closure is `FnOnce` because it moves the variable `path` out of its environment
       |
       = note: required for `ServiceFn<{closure@examples/src/uds/client.rs:26:44: 26:52}>` to implement `Service<Uri>`
    

    Okay this makes sense. We moved in an external value and consumed it, of course the closure is FnOnce. Since UnixStream::connect is generic, we will try passing it a &path instead.

    Note that the move keyword is not necessary for a closure to capture by value; it only forces capturing by value in cases where it normally wouldn't (ref).

        let path = PathBuf::from("your/path/here");
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(|_: Uri| {
                async {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
                }
            }))
            .await?;
    
    error[E0373]: closure may outlive the current function, but it borrows `path`, which is owned by the current function
      --> examples/src/uds/client.rs:26:44
       |
    26 |         .connect_with_connector(service_fn(|_: Uri| {
       |                                            ^^^^^^^^ may outlive borrowed value `path`
    ...
    29 |                 Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
       |                                                                           ---- `path` is borrowed here
       |
    note: function requires argument type to outlive `'static`
      --> examples/src/uds/client.rs:26:33
       |
    26 |           .connect_with_connector(service_fn(|_: Uri| {
       |  _________________________________^
    27 | |             async {
    28 | |                 // Connect to a Uds socket
    29 | |                 Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
    30 | |             }
    31 | |         }))
       | |__________^
    help: to force the closure to take ownership of `path` (and any other referenced variables), use the `move` keyword
       |
    26 |         .connect_with_connector(service_fn(move |_: Uri| {
       |                                            ++++
    

    Okay, lifetime issue, makes sense. The closure could run on a thread long after this function has returned. Let's add move.

        let path = PathBuf::from("your/path/here");
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(move |_: Uri| {
                async {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
                }
            }))
            .await?;
    
    error: lifetime may not live long enough
      --> examples/src/uds/client.rs:27:13
       |
    26 |           .connect_with_connector(service_fn(move |_: Uri| {
       |                                              -------------
       |                                              |           |
       |                                              |           return type of closure `{async block@examples/src/uds/client.rs:27:13: 27:18}` contains a lifetime `'2`
       |                                              lifetime `'1` represents this closure's body
    27 | /             async {
    28 | |                 // Connect to a Uds socket
    29 | |                 Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
    30 | |             }
       | |_____________^ returning this value requires that `'1` must outlive `'2`
       |
       = note: closure implements `Fn`, so references to captured variables can't escape the closure
    

    This error message may appear a bit cryptic, especially the annotations on '1 and '2. But the note at the bottom is telling: we just aren't allowed to keep a reference to data within the closure in our async block. Async blocks capture variables using the same rules as closures (ref), so let's make it async move.

        let path = PathBuf::from("your/path/here");
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(move |_: Uri| {
                async move {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
                }
            }))
            .await?;
    
    error[E0525]: expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
      --> examples/src/uds/client.rs:26:44
       |
    26 |         .connect_with_connector(service_fn(move |_: Uri| {
       |          ----------------------            ^^^^^^^^^^^^^ this closure implements `FnOnce`, not `FnMut`
       |          |
       |          the requirement to implement `FnMut` derives from here
    ...
    29 |                 Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&path).await?))
       |                                                                           ---- closure is `FnOnce` because it moves the variable `path` out of its environment
       |
       = note: required for `ServiceFn<{closure@examples/src/uds/client.rs:26:44: 26:57}>` to implement `Service<Uri>`
    

    Okay, so our closure is FnOnce again. Why? Again the compiler tells us: it's because we moved path into our async block. We can't do that. For the closure to be FnMut (and in fact, Fn), path has to stay in the closure. In other words, we want shared ownership. In Rust, we do that using reference counting smart pointers. Let's do that.

        let path = PathBuf::from("your/path/here");
        let path_shared = Rc::new(path);
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(move |_: Uri| {
                // path_shared gets moved into the closure
                // then we create another owned pointer to the same data
                let path_shared_2 = Rc::clone(&path_shared);
                async move {
                    // Connect to a Uds socket
                    // path_shared_2 gets moved into the async block
                    // but path_shared stays put
                    Ok::<_, std::io::Error>(TokioIo::new(
                        UnixStream::connect(path_shared_2.as_ref()).await?,
                    ))
                }
            }))
            .await?;
    
    error[E0277]: `Rc<PathBuf>` cannot be sent between threads safely
       --> examples/src/uds/client.rs:27:33
        |
    27  |           .connect_with_connector(service_fn(move |_: Uri| {
        |  __________----------------------_^
        | |          |
        | |          required by a bound introduced by this call
    28  | |             // path_shared gets moved into the closure
    29  | |             // then we create another owned pointer to the same data
    30  | |             let path_shared_2 = Rc::clone(&path_shared);
    ...   |
    38  | |             }
    39  | |         }))
        | |__________^ `Rc<PathBuf>` cannot be sent between threads safely
        |
        = help: within `ServiceFn<{closure@examples/src/uds/client.rs:27:44: 27:57}>`, the trait `Send` is not implemented for `Rc<PathBuf>`, which is required by `ServiceFn<{closure@examples/src/uds/client.rs:27:44: 27:57}>: Send`
    note: required because it's used within this closure
       --> examples/src/uds/client.rs:27:44
        |
    27  |         .connect_with_connector(service_fn(move |_: Uri| {
        |                                            ^^^^^^^^^^^^^
    note: required because it appears within the type `ServiceFn<{closure@examples/src/uds/client.rs:27:44: 27:57}>`
       --> /home/cyq/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tower-0.4.13/src/util/service_fn.rs:54:12
        |
    54  | pub struct ServiceFn<T> {
        |            ^^^^^^^^^
    note: required by a bound in `Endpoint::connect_with_connector`
       --> /home/cyq/Repos/Public/tonic/tonic/src/transport/channel/endpoint.rs:366:27
        |
    364 |     pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
        |                  ---------------------- required by a bound in this associated function
    365 |     where
    366 |         C: Service<Uri> + Send + 'static,
        |                           ^^^^ required by this bound in `Endpoint::connect_with_connector`
    

    So Endpoint::connect_with_connector wants a Send future. What does this mean? The documentation of Send tells us: it's a type that can be transferred across thread boundaries (ref). The specific reason it wants this is that tokio by default uses a multi-threaded executor. It is possible to configure it to run single-threaded, thereby alleviating the Send bound, but it's seldom done in practice.

    So why isn't our future Send? Again the error tells us: Rc<PathBuf> cannot be sent between threads safely. Why isn't Rc Send? We can find it in the docs:

    Rc uses non-atomic reference counting. This means that overhead is very low, but an Rc cannot be sent between threads, and consequently Rc does not implement Send. As a result, the Rust compiler will check at compile time that you are not sending Rcs between threads. If you need multi-threaded, atomic reference counting, use sync::Arc.

    So let's use Arc instead.

        let path = PathBuf::from("your/path/here");
        let path_shared = Arc::new(path);
    
        let channel = Endpoint::try_from("http://[::]:50051")?
            .connect_with_connector(service_fn(move |_: Uri| {
                let path_shared_2 = Arc::clone(&path_shared);
                async move {
                    // Connect to a Uds socket
                    Ok::<_, std::io::Error>(TokioIo::new(
                        UnixStream::connect(path_shared_2.as_ref()).await?,
                    ))
                }
            }))
            .await?;
    

    And finally we have arrived at our final code. Note that I used the name path_shared_2 here for clarity; in practice people usually just shadow path_shared like so: let path_shared = Arc::clone(&path_shared);.


    The biggest characteristic about Rust is that it doesn't let you compile until everything is perfect, and getting to perfection is hard. Some people find it frustrating, but personally I like it because I can rest assured that there's not going to be an urgent call that wakes me up at 0300. But as you can see here, the Rust compiler can often do a tremendous job in guiding you to the right answer, if you follow its advice step by step.