Search code examples
apache-kafkaakka-streamakka-http

Connecting akka streams kafka and akka http


I am currently playing around with akka streams and tried the following example.

Get the first element from kafka when requesting a certain HTTP endpoint. This is the code I wrote and its working.

get {
      path("ticket" / IntNumber) { ticketNr =>

        val future = Consumer.plainSource(consumerSettings, Subscriptions.topics("tickets"))
          .take(1)
          .completionTimeout(5 seconds)
          .runWith(Sink.head)

        onComplete(future) {
          case Success(record) => complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, record.value()))
          case _ => complete(HttpResponse(StatusCodes.NotFound))
        }
      }
    }

I am just wondering if this is the ideomatic way of working with (akka) streams. So is there a more "direct" way of connecting the kafka stream to the HTTP response stream?

For example, when POSTing I do this:

val kafkaTicketsSink = Flow[String]
    .map(new ProducerRecord[Array[Byte], String]("tickets", _))
    .toMat(Producer.plainSink(producerSettings))(Keep.right)    

post {
          path("ticket") {
            (entity(as[Ticket]) & extractMaterializer) { (ticket, mat) => {
                val f = Source.single(ticket).map(t => t.description).runWith(kafkaTicketsSink)(mat)
                onComplete(f) { _ =>
                  val locationHeader = headers.Location(s"/ticket/${ticket.id}")
                  complete(HttpResponse(StatusCodes.Created, headers = List(locationHeader)))
                }
              }
            }
          }
        }

Maybe this can also be improved??


Solution

  • You could keep a single, backpressured stream alive using Sink.queue. You can pull an element from the materialized queue every time a request is received. This should give you back one element if available, and backpressure otherwise.

    Something along the lines of:

      val queue = Consumer.plainSource(consumerSettings, Subscriptions.topics("tickets"))
        .runWith(Sink.queue())
    
      get {
        path("ticket" / IntNumber) { ticketNr =>
    
          val future: Future[Option[ConsumerRecord[String, String]]] = queue.pull()
    
          onComplete(future) {
            case Success(Some(record)) => complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, record.value()))
            case _ => complete(HttpResponse(StatusCodes.NotFound))
          }
        }
      }
    

    More info on Sink.queue can be found in the docs.