Search code examples
rusthyperrust-axum

Why with axum v0.7 the stream is no Sync anymore?


Using axum v0.6 I was able to use the below code with the following function:

pub async fn create_file(
    &self,
    name: &str,
    reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,
) -> Result<CreatedFile>
// axum's handler
pub async fn upload(stream: BodyStream) -> Result<Response, StatusCode> {
    let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));

    let res = file_create("filename", Box::pin(StreamReader::new(stream))).await?;

    Ok((StatusCode::CREATED, "Ok").into_response())
}

Now with axum v0.7 I'm using this:

// axum's handler
pub async fn upload(request: Request) -> Result<Response, StatusCode> {
    let stream = request
        .into_body()
        .into_data_stream()
        .map_err(|err| io::Error::new(io::ErrorKind::Other, err));

    let res = file_create("filename", Box::pin(StreamReader::new(stream))).await?;

    Ok((StatusCode::CREATED, "Ok").into_response())
}

and I'm getting this error:

error[E0277]: `(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
59  |             Box::pin(StreamReader::new(stream)),
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)`
    = note: required for `std::ptr::Unique<(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)>` to implement `std::marker::Sync`
note: required because it appears within the type `Box<dyn HttpBody<Data = Bytes, Error = Error> + Send>`
    |
195 | pub struct Box<
    |            ^^^
note: required because it appears within the type `Pin<Box<dyn HttpBody<Data = Bytes, Error = Error> + Send>>`
    |
410 | pub struct Pin<P> {
    |            ^^^
note: required because it appears within the type `UnsyncBoxBody<Bytes, Error>`
    |
17  | pub struct UnsyncBoxBody<D, E> {
    |            ^^^^^^^^^^^^^
note: required because it appears within the type `Body`
    |
39  | pub struct Body(BoxBody);
    |            ^^^^
note: required because it appears within the type `BodyDataStream`
    |
140 | pub struct BodyDataStream {
    |            ^^^^^^^^^^^^^^
note: required because it appears within the type `IntoStream<BodyDataStream>`
    |
12  |     pub struct IntoStream<St> {
    |                ^^^^^^^^^^
note: required because it appears within the type `Map<IntoStream<BodyDataStream>, MapErrFn<{[email protected]:48:18}>>`
    |
15  |     pub struct Map<St, F> {
    |                ^^^
note: required because it appears within the type `MapErr<BodyDataStream, {[email protected]:48:18}>`
    |
63  |     MapErr<St, F>(
    |     ^^^^^^
note: required because it appears within the type `StreamReader<MapErr<BodyDataStream, {[email protected]:48:18}>, Bytes>`
    |
156 | pub struct StreamReader<S, B> {
    |            ^^^^^^^^^^^^
    = note: required for the cast from `Pin<Box<StreamReader<MapErr<BodyDataStream, {[email protected]:48:18}>, Bytes>>>` to `std::pin::Pin<std::boxed::Box<(dyn tokio::io::AsyncRead + std::marker::Send + std::marker::Sync + 'static)>>`

Why?


Solution

  • This function argument should be changed:

    reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,
    

    Just remove the Sync bound as it's unnecessary.

    To understand why it's unnecessary, we first need to understand what Sync does. What Sync means is "this type can be accessed using a shared reference by multiple threads safely." In other words, &T is Send only if T is Sync.

    For this to matter, the type needs to have methods that take self by shared reference (e.g. &self). However, none of the methods of AsyncRead do this; they all require an exclusive reference (technically a pinned exclusive reference -- Pin<&mut self>). This means that even if you have a Pin<&dyn AsyncRead>, you won't be able to do anything useful with it. You would need an exclusive reference, and Sync is unrelated to exclusive references.

    Further reading: