Search code examples
rustrust-actix

How to send a message to another actor from actix SyncContext?


I would like to implement a long running background task that can report progress to other Actors. I already accomplished that. But I would also like to be able to cancel the long running background task again.

What I got so far is this:

use actix::prelude::*;

struct Worker {}

impl Actor for Worker {
    type Context = SyncContext<Self>;
}

struct Manager {
    worker: Addr<Worker>,
}

impl Actor for Manager {
    type Context = Context<Self>;
}

impl Supervised for Manager {}

impl SystemService for Manager {
    fn service_started(&mut self, _ctx: &mut Context<Self>) {}
}

struct Work {}

#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work);

#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);

impl Handler<PerformWork> for Worker {
    type Result = ();

    fn handle(&mut self, msg: PerformWork, ctx: &mut Self::Context) -> Self::Result {
        for i in 0..10000000 {
            // Report progress
            Manager::from_registry().do_send(ReportProgress(i));
            // Do some very slow I/O.
            thread::sleep(time::Duration::from_millis(1));
        }
    }
}

impl Handler<ReportProgress> for Manager {
    type Result = ();

    fn handle(&mut self, msg: ReportProgress, ctx: &mut Self::Context) -> Self::Result {
        // Do something with the progress here
    }
}

The Manager also handles a Message that sends the PerformWork Message to the Worker.

I thought of giving the ReportProgress Message a bool return type that would allow the Worker decide if it should break out of its loop. However, I cannot manage sending a Message with a return result to the Manager. Using send() instead of do_send() returns a Future that I cannot resolve within the SyncContext.

Any ideas are very much appreciated.

A bit more background:

  • the really slow I/O is serial communication.
  • actix is version 0.10

Solution

  • I found a solution, but I am not convinced that it is a good one.

    I added an Arc<AtomicBool>> that is passed to the Worker. The Manager keeps a reference to the AtomicBool and can modify it. The Worker breaks out of its loop if the AtomicBool is modified by the Manager.

    use actix::prelude::*;
    use std::sync::atomic::{AtomicBool, Ordering};
    
    struct Worker {}
    
    impl Actor for Worker {
        type Context = SyncContext<Self>;
    }
    
    struct Manager {
        worker: Addr<Worker>,
    }
    
    impl Actor for Manager {
        type Context = Context<Self>;
    }
    
    impl Supervised for Manager {}
    
    impl SystemService for Manager {
        fn service_started(&mut self, _ctx: &mut Context<Self>) {}
    }
    
    struct Work {}
    
    #[derive(Message)]
    #[rtype(result = "()")]
    struct PerformWork(Work, Arc<AtomicBool>>);
    
    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct ReportProgress(i32);
    
    impl Handler<PerformWork> for Worker {
        type Result = ();
    
        fn handle(&mut self, msg: PerformWork, ctx: &mut Self::Context) -> Self::Result {
            for i in 0..10000000 {
                // Report progress
                Manager::from_registry().do_send(ReportProgress(i));
                if msg.1.load(Ordering::Relaxed) {
                    break;
                }
                // Do some very slow I/O.
                thread::sleep(time::Duration::from_millis(1));
            }
        }
    }
    
    impl Handler<ReportProgress> for Manager {
        type Result = ();
    
        fn handle(&mut self, msg: ReportProgress, ctx: &mut Self::Context) -> Self::Result {
            // Do something with the progress here
        }
    }