Search code examples
rustrust-tokio

Running concurrent tasks on each item of a Vec


I have a Vec containing character data, and I want to run the character AI concurrently on each character. This is not for performance reasons, but because each AI has cooldown periods in which it sleeps, allowing other AIs to run.

The AI is allowed to mutate the CharacterState, but only the one it was assigned to. Therefore theoretically each AI accesses a separate part of the Vec, and there would not be multiple references to any CharacterState.

I am struggling to express this in rust however. How do I:

  1. Split the Vec into multiple mutable references, one for each item.
  2. Spawn a task for each item.
  3. Await all the resulting tasks.

This is what I have so far: Rust Playground

#[derive(Clone, Default)]
struct Actor<TState>(TState);

impl<TState> Actor<TState> {
    pub fn new(initial_state: TState) -> Self {
        Actor(initial_state)
    }

    pub async fn act(&mut self, action: &(impl Action<State = TState> + ?Sized)) {
        action.execute(&mut self.0).await;
    }
}

#[async_trait::async_trait]
trait Action: Send + Sync {
    type State;

    async fn execute(&self, state: &mut Self::State);
}

struct CharacterData {
    x: i32,
    y: i32
}

type CharacterActor = Actor<CharacterData>;

struct RunCharacterAction;
#[async_trait::async_trait]
impl Action for RunCharacterAction {
    type State = CharacterData;
    
    async fn execute(&self, _state: &mut Self::State) {
        todo!()
    }
}

struct WorldData {
    characters: Vec<CharacterActor>
}

type WorldActor = Actor<WorldData>;

struct RunWorldAction;

#[async_trait::async_trait]
impl Action for RunWorldAction {
    type State = WorldData;
    
    async fn execute(&self, state: &mut Self::State) {
        let tasks: Vec<_> = state.characters
            .iter_mut()
            .map(|character| {
                tokio::spawn(async {
                    let action = RunCharacterAction;
                    character.act(&action).await;
                })
            }).collect();
            
        futures::future::join_all(tasks).await;
    }
}

#[tokio::main]
async fn main() {
    let mut world = WorldActor::new(WorldData {
        characters: vec![
            CharacterActor::new(CharacterData { x: 1, y: 2 }),
            CharacterActor::new(CharacterData { x: 3, y: 4 })]
    });
    
    let action = RunWorldAction;
    world.act(&action).await;
}

Which produces the following error

error[E0521]: borrowed data escapes outside of method
  --> src/main.rs:51:29
   |
50 |       async fn execute(&self, state: &mut Self::State) {
   |                               -----  - lifetime `'life1` defined here
   |                               |
   |                               `state` is a reference that is only valid in the method body
51 |           let tasks: Vec<_> = state.characters
   |  _____________________________^
52 | |             .iter_mut()
   | |                       ^
   | |                       |
   | |_______________________`state` escapes the method body here
   |                         argument requires that `'life1` must outlive `'static`

For more information about this error, try `rustc --explain E0521`.
error: could not compile `playground` (bin "playground") due to 1 previous error

Solution

  • Since you don't need the tasks to run in parallel, you don't need to spawn them. join_all is sufficient to poll all of them concurrently.

    let tasks = state
        .characters
        .iter_mut()
        .map(|character| async move {
            let action = RunCharacterAction;
            character.act(&action).await;
        });
    
    futures::future::join_all(tasks).await;