Search code examples
scalaakkaakka-stream

Why does the stream never get triggered?


I have the following stream, that never reach the map after flatMapConcat.

  private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
  : KillSwitch = {

    implicit val materializer = ActorMaterializer()
    implicit val dispatcher = materializer.executionContext

    system.log.info("=============> Start KafkaDetectorStream <=============")

    val addr = system
      .settings
      .config
      .getConfig("kafka")
      .getString("servers")

    val sink: Sink[ServerHealthEvent, NotUsed] =
      ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
        ref = ref,
        onCompleteMessage = Complete,
        onFailureMessage = Fail.apply,
        messageAdapter = Message.apply,
        onInitMessage = Init.apply,
        ackMessage = Ack)

    Source.tick(1.seconds, 5.seconds, NotUsed)
      .flatMapConcat(_ => Source.fromFuture(health(addr)))
      .map {
        case true =>
          KafkaActiveConfirmed
        case false =>
          KafkaInactiveConfirmed
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .to(sink)
      .run()
  }

  private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
    val props = new Properties
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
    props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")

    Future {
      AdminClient
        .create(props)
        .listTopics()
        .names()
        .get()
    }
      .map(_ => true)
      .recover {
        case _: Throwable => false
      }
  }

What I mean is, that this part:

.map {
  case true =>
    KafkaActiveConfirmed
  case false =>
    KafkaInactiveConfirmed
} 

never gets executed and I do not know the reason. The method health executes as expected.


Solution

  • Try to add .log between flatMapConcat and map to see emited element. log can else log errors and stream cancelation. https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/log.html

    Note, .log using implicit logger

    And your .flatMapConcat(_ => Source.fromFuture(health(addr))) seams triky, try .mapAsyncUnordered(1)(_ => health(addr))