Search code examples
rustrust-futuresrust-tonic

Unpinning a BoxStream to send using Tonic gRPC stream


I am new to Rust and writing a simple application that will stream some values over gRPC, using Tonic. These values are initially acquired from an external library as a BoxStream (Pin<Box<Stream>>), and tonic's API requires something that implements Stream (which of course Pin does not).

Tonic's streaming example uses a ReceiverStream to convert a mpsc channel into a stream, and spinning off a thread to push values into it. This would require a stream lifetime of 'static which is not an option for my actual implementation because the lifetime of my stream is associated with the class that returns it.

What is the best way to provide something that implements Stream, that I can give to Tonic, from my Pin<Box<Stream>>?

src/main.rs (This will not compile, since BoxStream<'static, Entry> does not implement IntoStreamingRequest)

use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};

struct Entry {
    key: String,
}

fn main() {
    // Create Request
    let stream: BoxStream<'static, Entry> = api_function();
    let request = stream.into_streaming_request();

    // Send request
    //let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
    //let response = client.grpc_function(request).await?;
}

fn api_function() -> BoxStream<'static, Entry> {
    Box::pin(stream! {
        let entries = vec!(
            Entry {key: String::from("value1")},
            Entry {key: String::from("value2")}
        );

        for entry in entries {
            yield entry;
        }
    })
}

Cargo.toml

[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"

Compilation Error provided:

error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
   --> src\main.rs:12:26
    |
12  |     let request = stream.into_streaming_request();
    |                          ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
    |
   ::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
    |
408 | pub struct Pin<P> {
    | -----------------
    | |
    | doesn't satisfy `_: IntoStreamingRequest`
    | doesn't satisfy `_: Sync`
    |
   ::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
    |
27  | pub trait Stream {
    | ----------------
    | |
    | doesn't satisfy `_: IntoStreamingRequest`
    | doesn't satisfy `_: Sized`
    | doesn't satisfy `_: Sync`
    |
    = note: the following trait bounds were not satisfied:
            `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
            which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
            which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
            `dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
            which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
            which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
            which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
            `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
            which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`



Solution

  • The problem is that tonic implements IntoStreamingRequest only for types that are both Send and Sync:

    impl<T> IntoStreamingRequest for T
    where
        T: Stream + Send + Sync + 'static
    

    But BoxStream is not:

    pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
    

    Instead of using BoxStream you should copy its definition and add an additional + Sync bound:

    fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
        Box::pin(stream! {
            let entries = vec!(
                Entry {key: String::from("value1")},
                Entry {key: String::from("value2")}
            );
    
            for entry in entries {
                yield entry;
            }
        })
    }
    

    And because the stream returned by the stream!() macro is already Send + Sync your code will compile fine.

    PS: remove the unnecessary type hint at:

        let stream: BoxStream<'static, Entry> = api_function();
    // should become:
        let stream = api_function(); // after the above change it's not BoxStream anymore!