I have actors, that looks like as follows:
As you can see on the image, the ActorStream
is a child of Actor
.
The question is, when I terminate the Actor
, will the ActorStream
also be terminated?
Here is the way, how I create the ActorStream
in an Actor
:
def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
Behaviors.setup { context =>
implicit val system = context.system
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
val kafkaServer = system
.settings
.config
.getConfig("kafka")
.getString("servers")
val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
ref = context.self,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
.map {
case true =>
KafkaActive
case false =>
KafkaInactive
}
.to(sink)
.run()
Behaviors.receiveMessage {
case Init(ackTo) =>
ackTo ! Ack
Behaviors.same
case Message(ackTo, msg) =>
fsm ! msg
ackTo ! Ack
create(fsm, Some(cancel))
case Complete =>
Behaviors.same
case Fail(_) =>
fsm ! KafkaInactive
Behaviors.same
}
}
In your case actor termination must terminate stream because under the hood stage actor watching passed actorRef and complete stage if Terminated arrived
I think you can find more information here https://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/
An extremely important aspect to understand is that the materialized stream is running as a set of actors on the threads of the execution context on which they were allocated. In other words, the stream is running independently from the actor that allocated it. This becomes very important if the stream is long-running, or even infinite, and we want the actor to manage the life-cycle of the stream, such that when the actor stops, the stream is terminated. Expanding on the example above, I will make the stream infinite and use a KillSwitch to manage the life-cycle of the stream.