Search code examples
scalaakkaakka-streamakka-http

Akka streams Source.repeat stops after 100 requests


I am working on the below stream processing system to grab frames from one source, process, and send to another. I'm using a combination of akka-streams and akka-http through their scapa api. The pipeline is very short but I can't seem to locate where the system decides to stop after precisely 100 requests to the endpoint.

object frameProcessor extends App {
  implicit val system: ActorSystem = ActorSystem("VideoStreamProcessor")
  val decider: Supervision.Decider = _ => Supervision.Restart
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
  val http = Http(system)
  val sourceConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = http.outgoingConnection(sourceUri)

  val byteFlow: Flow[HttpResponse, Future[ByteString], NotUsed] =
    Flow[HttpResponse].map(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _))

  Source.repeat(HttpRequest(uri = sourceUri))
    .via(sourceConnectionFlow)
    .via(byteFlow)
    .map(postFrame)
    .runWith(Sink.ignore)
    .onComplete(_ => system.terminate())

  def postFrame(imageBytes: Future[ByteString]): Unit = {
    imageBytes.onComplete{
      case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
      case Failure(_) => system.log.error("failed to post image!")
    }
  }
}

Fore reference, I'm using akka-streams version 2.5.19 and akka-http version 10.1.7. No error is thrown, no error codes on the source server where the frames come from, and the program exits with error code 0.

My application.conf is as follows:

logging = "DEBUG"

Always 100 units processed.

Thanks!

Edit

Added logging to the stream like so

.onComplete{
  case Success(res) => {
    system.log.info(res.toString)
    system.terminate()
  }
  case Failure(res) => {
    system.log.error(res.getMessage)
    system.terminate()
  }

}

Received a connection reset exception but this is inconsistent. The stream completes with Done.

Edit 2

Using .mapAsync(1)(postFrame) I get the same Success(Done) after precisely 100 requests. Additionally, when I check the nginx server access.log and error.log there are only 200 responses.

I had to modify postFrame as follows to run mapAsync

def postFrame(imageBytes: Future[ByteString]): Future[Unit] = {
  imageBytes.onComplete{
    case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
    case Failure(_) => system.log.error("failed to post image!")
  }
  Future(Unit)
}

Solution

  • I believe I have found the answer on on the Akka docs using delayed restarts with a backoff operator. Instead of sourcing direct from an unstable remote connection, I use RestartSource.withBackoff and not RestartSource.onFailureWithBackoff. The modified stream looks like;

    val restartSource = RestartSource.withBackoff(
      minBackoff = 100.milliseconds,
      maxBackoff = 1.seconds,
      randomFactor = 0.2
    ){ () =>
      Source.single(HttpRequest(uri = sourceUri))
        .via(sourceConnectionFlow)
        .via(byteFlow)
        .mapAsync(1)(postFrame)
    }
    restartSource
      .runWith(Sink.ignore)
      .onComplete{
        x => {
          println(x)
          system.terminate()
        }
     } 
    

    I was not able to find the source of the problem but it seems this will work.