We have a clustered sharded actor named A and it has multiple child actors created with the child per entity pattern as shown below. When we Tell 100 messages from actor B to D and actor D takes say, 500 ms to process each message, at the same time, when we send the poison pill to actor A using Context.Parent.Tell (new Passivate (PoisonPill.Instance )); It immediately stops all child actors, including actor D, without processing pending messages.
A
|
B
/ \
C D
Is there a way to wait for actor D to process all the messages?
https://stackoverflow.com/a/70286526/377476 is a good start; you will need a custom shutdown message. When a parent actor terminates, it's children are automatically killed via /system
messages which supersede any unprocessed /user
messages in their queue.
So what you need to do is ensure that all of their /user
messages are processed prior to the parent terminating itself. There's a straightforward way to do this using the GracefulStop
extension method in combination with your custom stop message:
public sealed class ActorA : ReceiveActor{
private IActorRef _actorB;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ActorA(){
Receive<StartWork>(w => {
foreach(var i in Enumerable.Range(0, w.WorkCount)){
_actorB.Tell(i);
}
});
ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");
// stop child actor B with the same custom message
await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
// shut ourselves down after child is done
Context.Stop(Self);
});
}
protected override void PreStart(){
_actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
}
}
public sealed class ActorB : ReceiveActor{
private IActorRef _actorC;
private IActorRef _actorD;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ActorB(){
Receive<int>(i => {
_actorC.Tell(i);
_actorD.Tell(i);
});
ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");
// stop both actors in parallel
var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
// compose stop Tasks
var bothStopped = Task.WhenAll(stopC, stopD);
await bothStopped;
// shut ourselves down immediately
Context.Stop(Self);
});
}
protected override void PreStart(){
var workerProps = Props.Create(() => new WorkerActor());
_actorC = Context.ActorOf(workerProps, "c");
_actorD = Context.ActorOf(workerProps, "d");
}
}
public sealed class WorkerActor : ReceiveActor {
private readonly ILoggingAdapter _log = Context.GetLogger();
public WorkerActor(){
ReceiveAsync<int>(async i => {
await Task.Delay(10);
_log.Info("Received {0}", i);
});
}
}
I've created a runnable version of this sample here: https://dotnetfiddle.net/xiGyWM - you'll see that the MyStopMessage
s are received not long after the sample starts, but after C and D have been given work. All of that work completes before any actors terminate in this scenario.