Search code examples
c#akkaakka.net

Akka.net - How to wait child actor to process all pending messages prior to stop


We have a clustered sharded actor named A and it has multiple child actors created with the child per entity pattern as shown below. When we Tell 100 messages from actor B to D and actor D takes say, 500 ms to process each message, at the same time, when we send the poison pill to actor A using Context.Parent.Tell (new Passivate (PoisonPill.Instance )); It immediately stops all child actors, including actor D, without processing pending messages.

    A
    |
    B    
   / \
  C   D

Is there a way to wait for actor D to process all the messages?


Solution

  • https://stackoverflow.com/a/70286526/377476 is a good start; you will need a custom shutdown message. When a parent actor terminates, it's children are automatically killed via /system messages which supersede any unprocessed /user messages in their queue.

    So what you need to do is ensure that all of their /user messages are processed prior to the parent terminating itself. There's a straightforward way to do this using the GracefulStop extension method in combination with your custom stop message:

    public sealed class ActorA : ReceiveActor{
        private IActorRef _actorB;  
        
        private readonly ILoggingAdapter _log = Context.GetLogger();
        
        public ActorA(){
            Receive<StartWork>(w => {
                foreach(var i in Enumerable.Range(0, w.WorkCount)){
                    _actorB.Tell(i);
                }
            });
            
            ReceiveAsync<MyStopMessage>(async _ => {
                _log.Info("Begin shutdown");
                
                // stop child actor B with the same custom message
                await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
                
                // shut ourselves down after child is done
                Context.Stop(Self);
            });
        }
        
        protected override void PreStart(){
            _actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
        }
    }
    
    public sealed class ActorB : ReceiveActor{
        private IActorRef _actorC;
        private IActorRef _actorD;
        
        private readonly ILoggingAdapter _log = Context.GetLogger();
        
        public ActorB(){
            Receive<int>(i => {
                _actorC.Tell(i);
                _actorD.Tell(i);
            });
            
            ReceiveAsync<MyStopMessage>(async _ => {
                
                _log.Info("Begin shutdown");
                
                // stop both actors in parallel
                var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
                var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
                
                // compose stop Tasks
                var bothStopped = Task.WhenAll(stopC, stopD);
                await bothStopped;
                
                // shut ourselves down immediately
                Context.Stop(Self);
            });
        }
        
        protected override void PreStart(){
            var workerProps = Props.Create(() => new WorkerActor());
            _actorC = Context.ActorOf(workerProps, "c");
            _actorD = Context.ActorOf(workerProps, "d");
        }
    }
    
    public sealed class WorkerActor : ReceiveActor {
        private readonly ILoggingAdapter _log = Context.GetLogger();
        
        public WorkerActor(){
            ReceiveAsync<int>(async i => {
                await Task.Delay(10);
                _log.Info("Received {0}", i);
            });
        }
    }
    

    I've created a runnable version of this sample here: https://dotnetfiddle.net/xiGyWM - you'll see that the MyStopMessages are received not long after the sample starts, but after C and D have been given work. All of that work completes before any actors terminate in this scenario.