I'm using the Akka Streams Kafka library to interact with Kafka broker. I have the following stream definition with an UnBounded buffer:
def producerStream[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow).to(streamSink).run()
}
I call the producerStream from within an Actor instance. I have a message of type:
case class InitiateStream[T](publisher: ActorRef, anyMessage: T)
In my Actor, I have the following Receive block:
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
producerStream[DefaultMessage](cfg.producerProps)
// context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
The publisherActor will eventually get the messages that are supposed to be pushed to the respective Kafka topic.
Now my question is, do I have to bother about closing the producerStream?
I was thinking about doing a context.become(active(producerStream)) as soon as I start the stream and in the active method, I will handle the stream termination based on a DestroyStream message. Is this needed? What do you guys think?
If you use the Akka Kafka consumer, the source emits events indefinitely as far as events are available in the topic. Why do you need to close the producer?
I´ll suggest to call the following to start your source:
def producerSource[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow)
}
def startStream[A](source: Source[A, NotUsed])(
implicit am: ActorMaterializer): (UniqueKillSwitch, Future[Done]) = {
source
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run
}
var streamHandler : Option[UniqueKillSwitch] = None
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
streamHandler = Some(producerStream[DefaultMessage](cfg.producerProps))
context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
and for the active behavior:
val active: PartialFunction[Any, Unit] =
case DestroyStream =>
if(streamHandler.isDefined)
streamHandler.shutdown