Search code examples
scalaapache-kafkaakkareactive-kafka

Akka Stream Kafka, complete stream when reached end of log


I am using Akka Streams Kafka and I am looking for a way to do the following:

  • Initiation stream from offset x
  • Consume sequentially items x, x+1, x+2 .. until the last item
  • Once the last item has been consumed, complete the stream.

Code would look something like

Consumer
  .plainSource(consumerSettings, subscription)
  .runForeach(println("got record!"))
  .onComplete {
    case Success(_) => // all items read
    case Failure(error) => // error
  }

and it would complete after the last element has been read. Maybe this is not the way this library is intended to be used. How can I achieve this?


Solution

  • Akka Consumer works in a "pulling" fashion, it will be alive forever unless errors connecting with the broker happen. But, when do you consider that the stream is over?. Kafka can be considered as a distributed log from where you read messages given a offset. As long as your client is connected to the Broker your client will be up and running... If you consider your stream termination when no events come from Kafka for a time interval(for example) you could use idleTimeout:

      Consumer
        .plainSource(consumerSettings, subscription)
        .idleTimeout(10 seconds)
        .runForeach(e => println("E"))
        .onComplete {
          case Success(_) => // all items read
          case Failure(error) =>
          // TimeoutException if no element in ten seconds the stream stops throwing this exception
        }
    

    Another possibility could be using a Fan-In stage, specifically MergePreferred. We can create another Tick Source which emits events in a time interval. The Kafka source will have preference, so as far as elements come from Kafka the stage always will pull elements from this source. If no elements in some interval a "Timeout" string will be pushed downstream. Something like:

      implicit val actorSystem = ActorSystem("test-actor-system")
      implicit val streamMaterializer = ActorMaterializer()
      implicit val ec = actorSystem.dispatcher
    
      val consumer =
      Consumer
        .plainSource(consumerSettings, subscription)
        .map(_.value())
    
      val tick = Source.tick(50 millis, 30 seconds, "Timeout")
    
      val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
        (r1, r2) ⇒
          val merge = b.add(MergePreferred[String](1, false))
          r2 ~> merge.in(0)
          r1 ~> merge.preferred
          SourceShape(merge.out)
      }
    
      Source
        .fromGraph(source)
        .takeWhile(el => el != "Timeout")
        .runForeach(msg => println(msg))
      .onComplete{
        case Success(_) => println("Stream ended")
        case Failure(error) => println("There was an error")
      }
    

    With takeWhile the stream will be active meanwhile there are elements from Kafka.

    This is only one approach. Akka Stream has many different stages and the Graph Api to face these situations perhaps in a more elegant way.