Search code examples
scalaapache-kafkaakkaakka-stream

Handling Streams Termination with Akka Streams Kafka


I'm using the Akka Streams Kafka library to interact with Kafka broker. I have the following stream definition with an UnBounded buffer:

  def producerStream[T: MessageType](producerProperties: Map[String, String]) = {
    val streamSource = source[T](producerProperties)
    val streamFlow = flow[T](producerProperties)
    val streamSink = sink(producerProperties)
    streamSource.via(streamFlow).to(streamSink).run()
  }

I call the producerStream from within an Actor instance. I have a message of type:

case class InitiateStream[T](publisher: ActorRef, anyMessage: T)

In my Actor, I have the following Receive block:

  override def receive: Receive = super.receive orElse {
    case StartProducerStream(publisherActor, DefaultMessage) =>
      producerStream[DefaultMessage](cfg.producerProps)
      // context.become(active)

    case other => println(s"SHIT !! Got unknown message: $other")
  }

The publisherActor will eventually get the messages that are supposed to be pushed to the respective Kafka topic.

Now my question is, do I have to bother about closing the producerStream?

I was thinking about doing a context.become(active(producerStream)) as soon as I start the stream and in the active method, I will handle the stream termination based on a DestroyStream message. Is this needed? What do you guys think?


Solution

  • If you use the Akka Kafka consumer, the source emits events indefinitely as far as events are available in the topic. Why do you need to close the producer?

    I´ll suggest to call the following to start your source:

    def producerSource[T: MessageType](producerProperties: Map[String, String]) = {
        val streamSource = source[T](producerProperties)
        val streamFlow = flow[T](producerProperties)
        val streamSink = sink(producerProperties)
        streamSource.via(streamFlow)
      }
    
    def startStream[A](source: Source[A, NotUsed])(
      implicit am: ActorMaterializer): (UniqueKillSwitch, Future[Done]) = {
       source
       .viaMat(KillSwitches.single)(Keep.right)
       .toMat(Sink.ignore)(Keep.both)
       .run
    }
    
    var streamHandler : Option[UniqueKillSwitch] = None
    
      override def receive: Receive = super.receive orElse {
        case StartProducerStream(publisherActor, DefaultMessage) =>
          streamHandler = Some(producerStream[DefaultMessage](cfg.producerProps))
          context.become(active)
    
        case other => println(s"SHIT !! Got unknown message: $other")
      }
    

    and for the active behavior:

       val active: PartialFunction[Any, Unit] = 
         case DestroyStream => 
            if(streamHandler.isDefined)
                streamHandler.shutdown