Search code examples
rustasync-awaitrust-tokio

Rust async issues with local variables


I have a method like this:

    pub async fn build_incoming_stream(&mut self) -> impl Stream<Item = Result<
        SslStream<<tokio::net::tcp::Incoming<'_> as TryStream>::Ok>,
        Box<dyn Error + Send + Sync + 'static>,>,> 
    {
        let addr = self.address.parse::<SocketAddr>()
        .expect("Couldn't parse socket address");

        let mut listener = TcpListener::bind(addr)
                           .await
                           .expect("Couldn't bind address");
        let acceptor = self.configure_acceptor()
                       .await
                       .expect("configuring acceptor failed");

        incoming(listener.incoming(), acceptor.clone()) 
}

Where fn incoming is:

    pub fn incoming<S>(incoming: S,acceptor: SslAcceptor,) -> impl Stream<Item =  
    Result<SslStream<S::Ok>, StreamError>> where
     S: TryStream + Unpin,
     S::Ok: AsyncRead + AsyncWrite + Send + Sync + Debug + Unpin + 'static,
     S::Error: Into<StreamError>,
    {
      let mut incoming = incoming;

      try_stream! {
         while let Some(stream) = incoming.try_next().await? {
            let tls = tokio_openssl::accept(&acceptor, stream).await?;

            let ssl = SslStream {
               inner: tls
            };

             yield ssl;
         }
       }
     }

The compiler keeps complaining that I can't return a value referencing local data, on the last line of build_incomig_stream fn:

cannot return value referencing local data listener returns a value referencing data owned by the current function

I've tried to wrap it in async move block, to capture listener, but with no luck... any ideas how could I resolve or work around this issue?


Solution

  • Seems that tcp listener already implements stream, so this code works:

    pub async fn build_incoming_stream(
            &mut self,
        ) -> impl Stream<
            Item = Result<
                SslStream<<tokio::net::tcp::Incoming<'_> as TryStream>::Ok>,
                Box<dyn Error + Send + Sync + 'static>,
            >,
        > {
            let addr = self
                .address
                .parse::<SocketAddr>()
                .expect("Couldn't parse socket address");
    
            
            let acceptor = self
                .configure_acceptor()
                .await
                .expect("configuring acceptor failed");
           
    
            incoming(
                TcpListener::bind(addr)
                    .await
                    .expect("Couldn't bind address"),
                acceptor.clone(),
            )
        }
    

    The issue with code posted in question was in

    pub fn incoming(&mut self) -> Incoming<'_> {
            Incoming::new(self)
        }
    

    because incoming takes tcp listener as mutable reference and this was causing trouble.