I have the following stream, that never reach the map
after flatMapConcat
private def stream[A](ref: ActorRef[ServerHealthStreamer])(implicit system: ActorSystem[A])
: KillSwitch = {
implicit val materializer = ActorMaterializer()
implicit val dispatcher = materializer.executionContext
system.log.info("=============> Start KafkaDetectorStream <=============")
val addr = system
val sink: Sink[ServerHealthEvent, NotUsed] =
ActorSink.actorRefWithAck[ServerHealthEvent, ServerHealthStreamer, Ack](
ref = ref,
onCompleteMessage = Complete,
onFailureMessage = Fail.apply,
messageAdapter = Message.apply,
onInitMessage = Init.apply,
ackMessage = Ack)
Source.tick(1.seconds, 5.seconds, NotUsed)
.flatMapConcat(_ => Source.fromFuture(health(addr)))
.map {
case true =>
case false =>
private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")
Future {
.map(_ => true)
.recover {
case _: Throwable => false
What I mean is, that this part:
.map {
case true =>
case false =>
never gets executed and I do not know the reason. The method health
executes as expected.
Try to add .log
between flatMapConcat
and map
to see emited element. log
can else log errors and stream cancelation.
Note, .log
using implicit logger
And your .flatMapConcat(_ => Source.fromFuture(health(addr)))
seams triky,
try .mapAsyncUnordered(1)(_ => health(addr))