Search code examples
multithreadingrustmutablespawnownership

Start a forever thread with a expired objects loop, removing those objects, in rust


I am writing a service, and I want to run a loop forever that checks for some expired objects and removes them if too old.

pub struct Obj {
    expired: NaiveDateTime,
}

pub struct Maintainer {
    objs: HashMap<id, Obj>,
}

pub trait Miller {
    fn new() -> Self;
}

impl Miller for Maintainer {
    fn new() -> Self {
        let i = Self {
            obj: Hashmap::new(),
        };
        i.start_exp_observer();
        i
    }
}

impl Maintainer {
    fn start_exp_observer(&self) {
        let observer = thread::spawn(move || loop {
            thread.sleep(sleep_time);
            self.objs
                .retain(|_, o| o.expired.gt(Utc::now().naive_utc()));
        });
        // does this even work here
        observer.join().unwrap();
    }
}

In rust, this does not work, because I am using an immutable self in start_exp_observer, which is the one created by new().

I tried fn start_exp_observer(&mut self) but that puts me in trouble, as the Self created in new is not mutable. And if I define that Self as mutable, then I get trouble with the trait.

And if feels like the more I try, the more trouble I get (if I'd clone Self before returning the object for example, then I guess I am not looking at the same objects in the thread).

How can this be done?


Solution

  • You need to make sure the self type can pass between thread, and limit only one access at the same time i.e. Mutex and Arc.

    use chrono::*;
    use std::collections::HashMap;
    
    use std::sync::{Arc, Mutex};
    
    pub struct Obj {
        expired: NaiveDateTime,
    }
    
    #[derive(Clone)]
    pub struct Maintainer {
        objs: Arc<Mutex<HashMap<u64, Obj>>>,
        //    ^   ^     ^
        //    |   |     |
        //    |   |     L The actual data 
        //    |   L To make sure only one access at a time
        //    L to make the data be able to pass between thread
    
    }
    
    pub trait Miller {
        fn new() -> Self;
    }
    
    impl Miller for Maintainer {
        fn new() -> Self {
            let objs = Default::default();
            let i = Self { objs };
            i.start_exp_observer();
            i
        }
    }
    
    impl Maintainer {
        fn start_exp_observer(&self) {
            // Clone a self, the clone will be sent to another thread
            let maintainer = self.clone();
            // Spawn a new thread, the `move` will move the `maintainer` variable to another thread
            // while keeping `self` in the current thread
            std::thread::spawn(move || loop {
                std::thread::sleep(std::time::Duration::from_millis(1000));
                maintainer
                    .objs 
                    .lock()
                    // ^^^^ we need to lock the mutex, to make sure we do not have another thread
                    //      i.e. the main thread, accessing it at the same time
                    .unwrap()
                    .retain(|_, o| o.expired.gt(&Utc::now().naive_utc()));
            });
            // Do not `join()` the handle
            // join mean await for it to finish 
            // because it is a infinite loop, the main thread will no progress
        }
    }