Search code examples
rustfuture

Why does my Future implementation get stuck at the start?


Although the code compiles, I don't understand why when using await directly it gets stuck in the first request. Why do I need to use the method execute_requests instead of calling it on the Future implementation?

// ...

async fn send(url: &str) {
    println!("Sending to URL {}", url);
    // Simulate the sending process
    sleep(Duration::from_millis(500)).await;
}

type Request = Pin<Box<dyn Future<Output = ()>>>;

struct Proxy;

impl Proxy {
    async fn execute_requests(&self) {
        let request_1 = async {
            send("url 1").await;
        };
        let request_2 = async {
            send("url 2").await;
        };

        let mut requests: Vec<Request> = vec![];
        requests.push(Box::pin(request_2));
        requests.push(Box::pin(request_1));

        while let Some(request) = requests.pop() {
            request.await;
        }
    }
}

impl Future for Proxy {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut queue_process = Box::pin(self.execute_requests());
        queue_process.as_mut().poll(cx)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let proxy = Proxy;
    // Executes both requests
    // Ok(proxy.execute_requests().await)
    // FIXME: Timeouts on the first request
    Ok(proxy.await)
}

Rust Playground

execute_requests is a simplification: it needs access to self to get the requests and other things.


Solution

  • Each time you poll, you create a new execute_requests() future and poll it once. It will never advance to the next poll - next time your poll() is called, a new future will be created and will be polled once, and so on.

    Instead, you should store the execute_requests() future within Proxy, and poll the same future:

    struct Proxy(Pin<Box<dyn Future<Output = ()>>>);
    
    impl Proxy {
        fn new() -> Self {
            Self(Box::pin(Self::execute_requests()))
        }
        
        async fn execute_requests() {
            let request_1 = async {
                send("url 1").await;
            };
            let request_2 = async {
                send("url 2").await;
            };
    
            let mut requests: Vec<Request> = vec![];
            requests.push(Box::pin(request_2));
            requests.push(Box::pin(request_1));
    
            while let Some(request) = requests.pop() {
                request.await;
            }
        }
    }
    
    impl Future for Proxy {
        type Output = ();
    
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            self.0.as_mut().poll(cx)
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let proxy = Proxy::new();
        // Executes both requests
        // Ok(proxy.execute_requests().await)
        // FIXME: Timeouts on the first request
        Ok(proxy.await)
    }
    

    Playground.