Search code examples
rustasync-awaitrust-tokiorust-rocket

impl AsyncRead for tonic::Streaming


I am trying to take the tonic routeguide tutorial, and turn the client into a rocket server. I am just taking the response and converting from the gRPC to a string.

service RouteGuide {
    rpc GetFeature(Point) returns (Feature) {}
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
}

This works well enough for GetFeature. For the ListFeatures query, just as Tonic allows the client the stream in the response, I wanted to pass this on to the Rocket client. I see that Rocket supports streaming responses, but I need to implement the AsyncRead trait.

Is there any way to do something like this? Below is a trimmed down version of about what I was doing:

struct FeatureStream {
    stream: tonic::Streaming<Feature>,
}

impl AsyncRead for FeatureStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // Write out as utf8 any response messages.
        match Pin::new(&mut self.stream.message()).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(feature) => Poll::Pending,
        }
    }
}

#[get("/list_features")]
async fn list_features(client: State<'_, RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
    let rectangle = Rectangle {
        low: Some(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        high: Some(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    };
    let mut client = client.inner().clone();
    let stream = client
        .list_features(Request::new(rectangle))
        .await
        .unwrap()
        .into_inner();
    Stream::from(FeatureStream { stream })
}

#[rocket::launch]
async fn rocket() -> rocket::Rocket {
    rocket::ignite()
        .manage(
            create_route_guide_client("http://[::1]:10000")
                .await
                .unwrap(),
        )
        .mount("/", rocket::routes![list_features,])
}

With the error:

error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>` cannot be unpinned
   --> src/web_user.rs:34:15
    |
34  |         match Pin::new(&mut self.stream.message()).poll(cx) {
    |               ^^^^^^^^ within `impl std::future::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>`
    | 
   ::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
    |
106 |     pub async fn message(&mut self) -> Result<Option<T>, Status> {
    |                                        ------------------------- within this `impl std::future::Future`
    |
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required by `Pin::<P>::new`

Solution

  • The problem is generated Future from tonic::Streaming<Feature>::message() doesn't implement Unpin since it is an async function. Let's label this type as MessageFuture, you cannot pin &mut MessageFuture pointer safely because the dereferenced type MessageFuture doesn't implement Unpin.

    Why it is not safe?

    From reference, implementation of Unpin brings:

    Types that can be safely moved after being pinned.

    It means if T:!Unpin then Pin<&mut T> is not movable, this is important because Futures created by async block has no Unpin implementation since it might hold reference of a member from itself, and if you move the T the pointee of this reference will also be moved, but the reference will still point the same address, to prevent this it should not be movable. Please read "Pinning" section from async-book to visualize the reason.

    Note: T:!Unpin means T is the type that has no Unpin implementation.

    Solution

    message() function is helper to pick next message from the tonic::Streaming<T>. You don't particularly need to call message() to pick next element from the stream, you already have actual stream in your structure.

    struct FeatureStream {stream: tonic::Streaming<Feature>}
    

    You can await for the next message for AsyncRead like:

    impl AsyncRead for FeatureStream {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut ReadBuf<'_>,
        ) -> Poll<std::io::Result<()>> {
    
           //it returns Pending for all cases as your code does, you can change it as you want
            match self.stream.poll_next_unpin(cx) {
                Poll::Ready(Some(Ok(m))) => Poll::Pending,
                Poll::Ready(Some(Err(e))) => Poll::Pending,
                Poll::Ready(None) => Poll::Pending,
                Poll::Pending => Poll::Pending
            }
        }
    }
    

    Please note that tonic::Streaming<T> has implementation of Unpin(reference)