Search code examples
rustrust-tokiorust-futures

Appending to a vector inside a buffered future - Rust


The following code fails with

cannot move out of `my_number_vector`, a captured variable in an `FnMut` closure
move out of `my_number_vector`

[dependencies]
futures = "0.3.21"
tokio = "1.19.2"

use futures::{stream, StreamExt};


#[tokio::main]
async fn main() {

let mut my_number_vector = Vec::new();

let the_stream = stream::iter(0..=10);


let response_of_stream = stream_of_urls
            .map(|number| async move {
              number
            })
            .buffer_unordered(2);

 response_of_stream
            .for_each(|number| async move {
               my_number_vector.push(number);
            })
            .await;


}

How is it possible to append to a vector in this scenario? I tried using RC and RefCells but it seems I cannot understand the error accurately in order to solve the compiler error.


Solution

  • The idea of Rc/RefCell was good but, since the async executor may rely on multiple threads, you need the multithreaded equivalent: Arc/Mutex.

    On top of that, there is a tricky part. The closure needs access to the vector inside the Arc/Mutex, then this closure has to be marked with move and the Arc has to be cloned. But, this closure involves an async block which also needs access to the vector inside the Arc/Mutex, then this block also has to be marked with move and the Arc has to be cloned again!

    use futures::{stream, StreamExt};
    use std::sync::{Arc, Mutex};
    
    #[tokio::main]
    async fn main() {
        let my_number_vector = Arc::new(Mutex::new(Vec::<i32>::new()));
    
        let the_stream = stream::iter(0..=10);
    
        let response_of_stream = the_stream // stream_of_urls ???
            .map(|number| async move { number })
            .buffer_unordered(2);
    
        // clone the Arc to be moved into the closure
        let my_vec_arc = Arc::clone(&my_number_vector);
    
        response_of_stream
            .for_each(move |number| {
                // clone again the Arc to be moved into the async block
                let my_vec_arc_2 = Arc::clone(&my_vec_arc);
    
                async move {
                    my_vec_arc_2.lock().unwrap().push(number);
                }
            })
            .await;
        println!("{:?}", my_number_vector.lock().unwrap());
        // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    }