Using axum v0.6 I was able to use the below code with the following function:
pub async fn create_file(
&self,
name: &str,
reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,
) -> Result<CreatedFile>
// axum's handler
pub async fn upload(stream: BodyStream) -> Result<Response, StatusCode> {
let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let res = file_create("filename", Box::pin(StreamReader::new(stream))).await?;
Ok((StatusCode::CREATED, "Ok").into_response())
}
Now with axum v0.7 I'm using this:
// axum's handler
pub async fn upload(request: Request) -> Result<Response, StatusCode> {
let stream = request
.into_body()
.into_data_stream()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let res = file_create("filename", Box::pin(StreamReader::new(stream))).await?;
Ok((StatusCode::CREATED, "Ok").into_response())
}
and I'm getting this error:
error[E0277]: `(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)` cannot be shared between threads safely
|
59 | Box::pin(StreamReader::new(stream)),
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)` cannot be shared between threads safely
|
= help: the trait `std::marker::Sync` is not implemented for `(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)`
= note: required for `std::ptr::Unique<(dyn axum::body::HttpBody<Data = tokio_util::bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)>` to implement `std::marker::Sync`
note: required because it appears within the type `Box<dyn HttpBody<Data = Bytes, Error = Error> + Send>`
|
195 | pub struct Box<
| ^^^
note: required because it appears within the type `Pin<Box<dyn HttpBody<Data = Bytes, Error = Error> + Send>>`
|
410 | pub struct Pin<P> {
| ^^^
note: required because it appears within the type `UnsyncBoxBody<Bytes, Error>`
|
17 | pub struct UnsyncBoxBody<D, E> {
| ^^^^^^^^^^^^^
note: required because it appears within the type `Body`
|
39 | pub struct Body(BoxBody);
| ^^^^
note: required because it appears within the type `BodyDataStream`
|
140 | pub struct BodyDataStream {
| ^^^^^^^^^^^^^^
note: required because it appears within the type `IntoStream<BodyDataStream>`
|
12 | pub struct IntoStream<St> {
| ^^^^^^^^^^
note: required because it appears within the type `Map<IntoStream<BodyDataStream>, MapErrFn<{closure@file.rs:48:18}>>`
|
15 | pub struct Map<St, F> {
| ^^^
note: required because it appears within the type `MapErr<BodyDataStream, {closure@file.rs:48:18}>`
|
63 | MapErr<St, F>(
| ^^^^^^
note: required because it appears within the type `StreamReader<MapErr<BodyDataStream, {closure@file.rs:48:18}>, Bytes>`
|
156 | pub struct StreamReader<S, B> {
| ^^^^^^^^^^^^
= note: required for the cast from `Pin<Box<StreamReader<MapErr<BodyDataStream, {closure@file.rs:48:18}>, Bytes>>>` to `std::pin::Pin<std::boxed::Box<(dyn tokio::io::AsyncRead + std::marker::Send + std::marker::Sync + 'static)>>`
Why?
This function argument should be changed:
reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,
Just remove the Sync
bound as it's unnecessary.
To understand why it's unnecessary, we first need to understand what Sync
does. What Sync
means is "this type can be accessed using a shared reference by multiple threads safely." In other words, &T
is Send
only if T
is Sync
.
For this to matter, the type needs to have methods that take self
by shared reference (e.g. &self
). However, none of the methods of AsyncRead
do this; they all require an exclusive reference (technically a pinned exclusive reference -- Pin<&mut self>
). This means that even if you have a Pin<&dyn AsyncRead>
, you won't be able to do anything useful with it. You would need an exclusive reference, and Sync
is unrelated to exclusive references.
Further reading: