I'd like to factor out some payload handling logic from my route handler. I'm not sure how to handle the type appropriately. Specifically, I want to mark the payload as Send safe, but the compiler complains that I cannot send payload across threads. How can I guarantee this Payload will only be handled by the relevant worker thread?
Let's say I have a route
#[post("/upload")]
pub async fn upload(mut body: Payload) -> Result<HttpResponse> {
Handler {}.handle_payload(body).await?;
}
This is one of many possible handlers, and so I define a simple trait with an implementation.
#[async_trait::async_trait]
pub trait Handle {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()>;
}
pub struct Handler {}
#[async_trait::async_trait]
impl Handle for Handler {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()> {
Ok(())
}
}
This results in the compiler shouting that I can't send Payload between threads. How can I factor an async payload processor out of the route?
| MyHandler {}.handle_payload(body).await?;
| -------------- ^^^^ `Rc<RefCell<actix_http::h1::payload::Inner>>` cannot be sent between threads safely
| |
| required by a bound introduced by this call
|
= help: within `actix_web::web::Payload`, the trait `std::marker::Send` is not implemented for `Rc<RefCell<actix_http::h1::payload::Inner>>`
= note: required because it appears within the type `actix_http::h1::payload::Payload`
= note: required because it appears within the type `actix_web::dev::Payload`
= note: required because it appears within the type `actix_web::web::Payload`
and
| MyHandler {}.handle_payload(body).await?;
| -------------- ^^^^ `(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)` cannot be sent between threads safely
| |
| required by a bound introduced by this call
|
= help: the trait `std::marker::Send` is not implemented for `(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)`
= note: required for `Unique<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>` to implement `std::marker::Send`
= note: required because it appears within the type `Box<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>`
= note: required because it appears within the type `Pin<Box<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>>`
= note: required because it appears within the type `actix_web::dev::Payload`
= note: required because it appears within the type `actix_web::web::Payload`
Actix web does not require its handlers to be thread-safe. The webserver uses multiple threads, but once a request has been assigned to a thread, it doesn't move. As such, many actix-specific types (like Payload
) are not thread-safe since they don't need to be.
However, async_trait
assumes the asynchronous function should be thread-safe by default. You can relax this constraint by passing ?Send
in your annotations:
#[async_trait::async_trait(?Send)]
pub trait Handle {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()>;
}
pub struct Handler {}
#[async_trait::async_trait(?Send)]
impl Handle for Handler {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()> {
Ok(())
}
}
See Non-threadsafe futures from the async-trait documentation.