Search code examples
rustasync-awaitrust-tokio

Running a method on self inside a tokio task


I have created a very simplified example of the code I am having an issue with:

use core::time;
use std::thread;
use tokio::sync::{mpsc::Receiver, RwLock};

struct MyStruct {
    counter: Arc<RwLock<i32>>,
    rx: RwLock<Receiver<i32>>,
}

impl MyStruct {
    async fn start_here(&self) { // <--------- Lifetime error here on self
        while let Some(message) = self.rx.write().await.recv().await {
            tokio::spawn(self.do_some_work_then_update_counter());
        }
    }

    async fn do_some_work_then_update_counter(&self) {
        let dur = time::Duration::from_millis(10000);
        thread::sleep(dur);
        let mut counter = self.counter.write().await;
        *counter += 1;
    }
}

There is a receiver that is receiving messages from another part of the program, and I want to be able to process each message in its own task to prevent blocking the next message from being processed.

As you can imagine it's a lifetime error since the task could outlast self in this case.

One solution I have done is this:

impl MyStruct {
    async fn start_here(&self) {
        while let Some(message) = self.rx.write().await.recv().await {
            let counter = self.counter.clone();
            tokio::spawn(do_some_work_then_update_counter(counter));
        }
    }
}

async fn do_some_work_then_update_counter(counter: Arc<RwLock<i32>>) {
    let dur = time::Duration::from_millis(10000);
    thread::sleep(dur);
    let mut counter = counter.write().await;
    *counter += 1;
}

This just doesn't seem like a good option, I want to keep do_some_work_then_update_counter as an impl of MyStruct instead of a free function since it is modifying data on MyStruct.

I am wondering if there is a better solution to this?


Solution

  • You can if you'll return impl Future directly instead of being async fn:

    fn do_some_work_then_update_counter(&self) -> impl Future<Output = ()> {
        let counter = Arc::clone(&self.counter);
        async move {
            let dur = time::Duration::from_millis(10000);
            thread::sleep(dur);
            let mut counter = counter.write().await;
            *counter += 1;
        }
    }