I am trying to take the tonic routeguide tutorial, and turn the client into a rocket server. I am just taking the response and converting from the gRPC to a string.
service RouteGuide {
rpc GetFeature(Point) returns (Feature) {}
rpc ListFeatures(Rectangle) returns (stream Feature) {}
}
This works well enough for GetFeature. For the ListFeatures query, just as Tonic allows the client the stream in the response, I wanted to pass this on to the Rocket client. I see that Rocket supports streaming responses, but I need to implement the AsyncRead trait.
Is there any way to do something like this? Below is a trimmed down version of about what I was doing:
struct FeatureStream {
stream: tonic::Streaming<Feature>,
}
impl AsyncRead for FeatureStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// Write out as utf8 any response messages.
match Pin::new(&mut self.stream.message()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(feature) => Poll::Pending,
}
}
}
#[get("/list_features")]
async fn list_features(client: State<'_, RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
let rectangle = Rectangle {
low: Some(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
high: Some(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
};
let mut client = client.inner().clone();
let stream = client
.list_features(Request::new(rectangle))
.await
.unwrap()
.into_inner();
Stream::from(FeatureStream { stream })
}
#[rocket::launch]
async fn rocket() -> rocket::Rocket {
rocket::ignite()
.manage(
create_route_guide_client("http://[::1]:10000")
.await
.unwrap(),
)
.mount("/", rocket::routes![list_features,])
}
With the error:
error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>` cannot be unpinned
--> src/web_user.rs:34:15
|
34 | match Pin::new(&mut self.stream.message()).poll(cx) {
| ^^^^^^^^ within `impl std::future::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>`
|
::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
|
106 | pub async fn message(&mut self) -> Result<Option<T>, Status> {
| ------------------------- within this `impl std::future::Future`
|
= note: required because it appears within the type `impl std::future::Future`
= note: required because it appears within the type `impl std::future::Future`
= note: required by `Pin::<P>::new`
The problem is generated Future
from tonic::Streaming<Feature>::message()
doesn't implement Unpin
since it is an async function. Let's label this type as MessageFuture
, you cannot pin &mut MessageFuture
pointer safely because the dereferenced type MessageFuture
doesn't implement Unpin
.
From reference, implementation of Unpin
brings:
Types that can be safely moved after being pinned.
It means if T:!Unpin
then Pin<&mut T>
is not movable, this is important because Future
s created by async block has no Unpin
implementation since it might hold reference of a member from itself, and if you move the T
the pointee of this reference will also be moved, but the reference will still point the same address, to prevent this it should not be movable. Please read "Pinning" section from async-book to visualize the reason.
Note: T:!Unpin
means T
is the type that has no Unpin
implementation.
message()
function is helper to pick next message from the tonic::Streaming<T>
. You don't particularly need to call message()
to pick next element from the stream, you already have actual stream in your structure.
struct FeatureStream {stream: tonic::Streaming<Feature>}
You can await for the next message for AsyncRead
like:
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
//it returns Pending for all cases as your code does, you can change it as you want
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Pending,
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending
}
}
}
Please note that tonic::Streaming<T>
has implementation of Unpin
(reference)