I'm trying to implement a grpc server that returns a stream using Tonic in Rust. Let's say we have a service like this:
rpc FooBar(Input) returns (stream Output) {}
}
I generate the code for the boilerplate using prost-build. Following the examples on the site, I could implement the service:
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
#[derive(Default)]
pub struct FooServer;
#[async_trait]
impl FooService for FooServer {
type FooBarStream = ReceiverStream<Result<Output, Status>>;
async fn foo_bar(
&self,
request: Request<Input>,
) -> std::result::Result<Response<Self::FooBarStream>, Status> {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
for ... {
tx.send(Ok(...)).await;
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
This works, but I think it's a bit ugly. Having found out about async-stream, I thought of rewriting it like this, as it's less boilerplate and easier to read. Possibly also more efficient for not needing an IPC but filling the stream directly.
use async_stream::{stream, AsyncStream};
...
#[async_trait]
impl FooService for FooServer {
type FooBarStream = AsyncStream?????
async fn foo_bar(
&self,
request: Request<Input>,
) -> std::result::Result<Response<Self::FooBarStream>, Status> {
let stream = stream! {
for ... {
yield ...;
}
});
Ok(stream)
}
}
The problem with this is that I cannot figure out the type of the stream. I tried AsyncStream<Result<Output, Status>>
, but it doesn't work because AsyncStream
has 2 template parameters, the second one being the function that generates the result, which in this case is an unnamed function.
Is there any way to make this work?
Since the complete AsyncStream
is an unnamable type and since the associated-type-position-impl-trait syntax is not yet available (i.e. type FooBarStream = impl Stream<...>;
), you'll have to do the next best thing; use a trait object.
The best way to do that is with Box::pin
:
#[async_trait]
impl FooService for FooServer {
type FooBarStream = Pin<Box<dyn Stream<Item = Result<Output, Status>> + Send + 'static>>;
async fn foo_bar(
&self,
request: Request<Input>,
) -> std::result::Result<Response<Self::FooBarStream>, Status> {
let stream = stream! {
for ... {
yield ...;
}
});
Ok(Response::new(Box::pin(stream) as Self::FooBarStream))
}
}
You can see this done in the Bidirectional streaming RPC part of the Tonic guide, though they do use try_stream!
in order to use ?
for errors.