Search code examples
scalaakkaakka-streamakka-kafka

Akka Stream TCP + Akka Stream Kafka producer not stopping not publishing messages and not error-ing out


I have the following stream:

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

It works fine for a while, and I can consume the messages populated on the Kafka topic. But from time to time, apparently at a random interval, there are no more messages published, and this code is not logging any errors (printAndByeBye will print the message passed and terminates the actor system.) After restarting the app, the messages continue to flow.

Any idea on how to know what is going on here?

Edit: I put Kamon on it and I could see the following behavior:

Mailbox Size per Actor

Time in Mailbox per Actor

Processing Time per Actor

It looks like something stopped without informing the stream should stop, but I don't know how to make it explicit and stop the stream.


Solution

  • The stream was not failing, but the TCP stream went idle as the device publishing data stop sending data after a while without dropping the connection. Instead of using the simpler:

    TCP().outgoingConnection(bsAddress, bsPort)
    

    I end up using:

    def outgoingConnection(
    remoteAddress:  InetSocketAddress,
    localAddress:   Option[InetSocketAddress]           = None,
    options:        immutable.Traversable[SocketOption] = Nil,
    halfClose:      Boolean                             = true,
    connectTimeout: Duration                            = Duration.Inf,
    idleTimeout:    Duration                            = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???
    

    so

    Tcp().outgoingConnection(bsAddress, bsPort)
    

    became

    val connectTimeout: Duration = 1 second
    val idleTimeout: Duration = 2 second
    Tcp().outgoingConnection(
        remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
        connectTimeout = connectTimeout,
        idleTimeout = idleTimeout
      )
    

    by informing idleTimeout, the follow start failing and another flow could be restarted.