Search code examples
rustrpcrust-tokiorust-rocketcapnproto

future cannot be sent between threads safely in Rust


I am using Rocket + Capnproto where I receive a request from a Web API(Rocket) and have to send it to another server via RPC (Capnproto)

here my main Rocket setup

#[rocket::main]
async fn main() {   
    rocket::build()
    .mount("/API",routes![invoke])    
    .launch()
    .await.ok();
}

Here my route method

#[post("/API", data = "<request>")]
async fn invoke(request: api_request::APIRequest<'_>)Result<Json<api_response::ApiResponse>, Json<api_response::ApiResponseError>>{

let result = capnp_rpc::client::run_client(String::from("Hello World")).await;
}

And here my Capnproto code, this is pretty much the hello world example they have, this works fine separately but once I put it on my Rocket projects fails. As you can see all is wrap in Tokio library.

pub async fn run_client( message: String ) ->Result<String, Box<dyn std::error::Error>> {
    let server_addr : String = "127.0.0.1:4000".to_string();
    let addr = server_addr
        .to_socket_addrs().unwrap()
        .next()
        .expect("could not parse address");       
        
        rocket::tokio::task::LocalSet::new()
        .run_until( async move {
            let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
            stream.set_nodelay(true).unwrap();
            let (reader, writer) =
                tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
            let rpc_network = Box::new(twoparty::VatNetwork::new(
                futures::io::BufReader::new(reader),
                futures::io::BufWriter::new(writer),
                rpc_twoparty_capnp::Side::Client,
                Default::default(),
            ));
            let mut rpc_system = RpcSystem::new(rpc_network, None);
            let hello_world: hello_world::Client =
                rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);

            rocket::tokio::task::spawn_local(rpc_system);           
           
            let mut request = hello_world.say_hello_request();
            request.get().init_request().set_name(&message[..]);

            let reply = request.send().promise.await?;           
            let reply_message  = reply.get()?.get_reply()?.get_message()?.to_str()?;
            println!("received: {}", reply_message);
            Ok(reply_message.to_string())
        }).await

}

here the full error I am getting

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> src\infrastructure\capnp_rpc\client.rs:284:16
    |
257 |          rocket::tokio::task::LocalSet::new()
    |          ------------------------------------ has type `LocalSet` which is not `Send`
...
284 |             }).await.unwrap()
    |                ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `*const ()`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> src\infrastructure\capnp_rpc\client.rs:284:16
    |
257 |          rocket::tokio::task::LocalSet::new()
    |          ------------------------------------ has type `LocalSet` which is not `Send`
...
284 |             }).await.unwrap()
    |                ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn StdError`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src\infrastructure\capnp_rpc\client.rs:257:10
    |
257 | /          rocket::tokio::task::LocalSet::new()
258 | |             .spawn_local( async move {
259 | |                 let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
260 | |                 stream.set_nodelay(true).unwrap();
...   |
283 | |                 Ok(reply_message.to_string())
284 | |             }).await.unwrap()
    | |______________^ await occurs here on type `tokio::task::JoinHandle<Result<std::string::String, Box<dyn StdError>>>`, which is not `Send`
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

From what I was able to investigate, Rocket has its own threat-worker and seem that this code is trying to start another threat-worker. I have tried few things from another questions trying to use Mutex, but could not make it work.

Not sure how to wrap this code so it can work under Rocket main threat-worker.


Solution

  • After getting help from few coworkers and I was able to make this work, here the code as I want to share the solution, still is little hard for me to understand, but I think the important part is to create a new threat from the main threat and then use the capnproto code.

    use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
    use rocket::futures::AsyncReadExt;
    use rocket::*;
    use std::net::ToSocketAddrs;
    use tokio::runtime::Builder;
    use tokio::sync::oneshot;
    use tokio::sync::oneshot::error::RecvError;
    use tokio::task::LocalSet;
    
    #[post("/API", data = "<request>")]
    async fn invoke(request: &str) {
        println!("req {}", request);
        let mut client = RpcClient::new(String::from("Hello World"));
        let response = client.get_response().await;
    }
    
    pub struct RpcClient {
        receiver: oneshot::Receiver<String>,
    }
    
    impl RpcClient {
        fn new(message: String) -> Self {
            let (sender, receiver) = oneshot::channel();
            let rt = Builder::new_current_thread().enable_all().build().unwrap();
            let server_addr: String = "127.0.0.1:4000".to_string();
            let addr = server_addr
                .to_socket_addrs()
                .unwrap()
                .next()
                .expect("could not parse address");
            std::thread::spawn(move || {
                let local = LocalSet::new();
                local.spawn_local(async move {
                    let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
                    stream.set_nodelay(true).unwrap();
                    let (reader, writer) =
                        tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
                    let rpc_network = Box::new(twoparty::VatNetwork::new(
                        futures::io::BufReader::new(reader),
                        futures::io::BufWriter::new(writer),
                        rpc_twoparty_capnp::Side::Client,
                        Default::default(),
                    ));
                    let mut rpc_system = RpcSystem::new(rpc_network, None);
                    let hello_world: hello_world::Client =
                        rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
                    
                    rocket::tokio::task::spawn_local(rpc_system);
                    
                    let mut request = hello_world.say_hello_request();
                    request.get().init_request().set_name(&message[..]);
                    
                    let reply = request.send().promise.await?;
                    let reply_message = reply.get()?.get_reply()?.get_message()?.to_str()?;
                    println!("received: {}", reply_message);
                    // send the message to the receiver
                    sender.send(reply_message.to_string()).unwrap();
                    Ok::<String, Box<dyn std::error::Error>>(reply_message.to_string())
                });
                rt.block_on(local);
            });
            Self { receiver }
        }
    
        async fn get_response(mut self) -> Result<String, String> {
            match self.receiver.await {
                Ok(response) => Ok(response),
                Err(error) => Err(error.to_string()),
            }
        }
    }
    #[rocket::main]
    async fn main() {
        rocket::build()
            .mount("/API", routes![invoke])
            .launch()
            .await
            .ok();
    }
    

    Thanks