Search code examples
rustrust-tokioactix-web

Passing Actix Web Payload to function


I'd like to factor out some payload handling logic from my route handler. I'm not sure how to handle the type appropriately. Specifically, I want to mark the payload as Send safe, but the compiler complains that I cannot send payload across threads. How can I guarantee this Payload will only be handled by the relevant worker thread?

Let's say I have a route

#[post("/upload")]
pub async fn upload(mut body: Payload) -> Result<HttpResponse> {
    Handler {}.handle_payload(body).await?;
}

This is one of many possible handlers, and so I define a simple trait with an implementation.

#[async_trait::async_trait]
pub trait Handle {
    async fn handle_payload(&self, s: impl Stream + Send) -> Result<()>;
}

pub struct Handler {}

#[async_trait::async_trait]
impl Handle for Handler {
    async fn handle_payload(&self, s: impl Stream + Send) -> Result<()> {
        Ok(())
    }
}

This results in the compiler shouting that I can't send Payload between threads. How can I factor an async payload processor out of the route?

    |     MyHandler {}.handle_payload(body).await?;
    |                  -------------- ^^^^ `Rc<RefCell<actix_http::h1::payload::Inner>>` cannot be sent between threads safely
    |                  |
    |                  required by a bound introduced by this call
    |
    = help: within `actix_web::web::Payload`, the trait `std::marker::Send` is not implemented for `Rc<RefCell<actix_http::h1::payload::Inner>>`
    = note: required because it appears within the type `actix_http::h1::payload::Payload`
    = note: required because it appears within the type `actix_web::dev::Payload`
    = note: required because it appears within the type `actix_web::web::Payload`

and

    |     MyHandler {}.handle_payload(body).await?;
    |                  -------------- ^^^^ `(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)` cannot be sent between threads safely
    |                  |
    |                  required by a bound introduced by this call
    |
    = help: the trait `std::marker::Send` is not implemented for `(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)`
    = note: required for `Unique<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>` to implement `std::marker::Send`
    = note: required because it appears within the type `Box<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>`
    = note: required because it appears within the type `Pin<Box<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>>`
    = note: required because it appears within the type `actix_web::dev::Payload`
    = note: required because it appears within the type `actix_web::web::Payload`

Solution

  • Actix web does not require its handlers to be thread-safe. The webserver uses multiple threads, but once a request has been assigned to a thread, it doesn't move. As such, many actix-specific types (like Payload) are not thread-safe since they don't need to be.

    However, async_trait assumes the asynchronous function should be thread-safe by default. You can relax this constraint by passing ?Send in your annotations:

    #[async_trait::async_trait(?Send)]
    pub trait Handle {
        async fn handle_payload(&self, s: impl Stream + Send) -> Result<()>;
    }
    
    pub struct Handler {}
    
    #[async_trait::async_trait(?Send)]
    impl Handle for Handler {
        async fn handle_payload(&self, s: impl Stream + Send) -> Result<()> {
            Ok(())
        }
    }
    

    See Non-threadsafe futures from the async-trait documentation.