Search code examples
scalaparallel-processingakkaakka-streamakka-http

running akka stream in parallel


I have a stream that

  1. listens for HTTP post receiving a list of events
  2. mapconcat the list of events in stream elements
  3. convert events in kafka record
  4. produce the record with reactive kafka (akka stream kafka producer sink)

Here is the simplified code

// flow to split group of lines into lines
  val splitLines = Flow[List[Evt]].mapConcat(list=>list)

// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
    .map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
    .toMat(Producer.plainSink(kafka))(Keep.right)

val routes = {
    path("ingest") {
      post {
        (entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
            val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
            val result = onComplete(ingest){
              case Success(value) => complete(s"OK")
              case Failure(ex)    => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
            }
            complete("eventList ingested: " + result)
          }
      }
    }
  }

Could you highlight me what is run in parallel and what is sequential ?

I think the mapConcat sequentialize the events in the stream so how could I parallelize the stream so after the mapConcat each step would be processed in parallel ?

Would a simple mapAsyncUnordered be sufficient ? Or should I use the GraphDSL with a Balance and Merge ?


Solution

  • In your case it will be sequential I think. Also you're getting whole request before you start pushing data to Kafka. I'd use extractDataBytes directive that gives you src: Source[ByteString, Any]. Then I'd process it like

    src
      .via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String))
      .mapConcat { line =>
        line.split(",")
      }.async
      .runWith(kafkaSink)(mat)