Search code examples
scalaakkaakka-streamakka-http

How to get the Httpresponse of http.singleRequest(httpRequest) inside the same thread on an Actor?


I have an Actor that sends a HTTP POST request using httpRequest => http.singleRequest(httpRequest).pipeTo(self)in one case BidRequest message. The the actor receives back the httpResponse in another case HttpResponse message. In this second case HttpResponse message I want to change a variable which the first case BidRequest message will send back. Because the messages are handled asynchronously, when I edit the variable on the second message, the first message already send back the variable with the old state.

I think I need to use the akka.pattern.ask in some way to not let the message arrive on another case HttpResponse, but stay in the same case BidRequest so I can edit the variable in place.

object AuctionClientActor {
  def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}

class AuctionClientActor(bidders: List[String])
  extends Actor with ActorLogging
    with BidJsonProtocol with SprayJsonSupport {

  import context.dispatcher

  implicit val system = context.system
  val http = Http(system)

  var bidOffer: BidOffer = BidOffer("", 0, "")

  def receive = {
    case bidRequest@BidRequest(requestId, bid) =>
      val content = bidRequest.bid.toJson.toString

      val latch = new CountDownLatch(bidders.size)

      val listResponseFuture: List[Future[HttpResponse]] = bidders
        .map(bidder =>
          HttpRequest( // create the request
            HttpMethods.POST,
            uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
            entity = HttpEntity(ContentTypes.`application/json`, content)
          )
        )
        // IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
        .map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request

      listResponseFuture.foreach { response =>
        Await.result(response, 3 seconds)
        response.onComplete {
          case Success(value) => latch.countDown // println(s"response success: $value")
          case Failure(exception) =>
            println(s"response failure: $exception")
            latch.countDown
        }
      }
      latch.await(3, TimeUnit.SECONDS)
      println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
      sender() ! Some(bidOffer.content)
      bidOffer = BidOffer("", 0, "")
    case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
      log.info(s"received HttpResponse OK(200): $resp")
      entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
        println("Got response, body: " + body.utf8String)
        val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
        // I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
        if (bidOffer.bid == 0) {
          println("new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else if (newBidOffer.bid > bidOffer.bid) {
          println("replace new")
          bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
        } else {
          println("none")
        }
      }
    case resp@HttpResponse(code, _, _, _) =>
      log.info(s"Request failed, response code: $code")
      resp.discardEntityBytes()
  }
}

I was looking at this answer to transform a List[Future] to Future[List], but when I do that I create a Future[List[Any]] and not a HttpResponse anymore.

Next code piece: So I tried to do the way you said but I am creating a List[Future[Future[String]]]. If I have only one host to do the request it is easy. But because I can have 1, 2, or 3 requests I create a list and the code get complicated. Plus the runFold from akka-stream creates another Future. Could you give a hint how to implement it in the way that you said?

      val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
        HttpRequest( // create the request
          HttpMethods.POST,
          uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
          entity = HttpEntity(ContentTypes.`application/json`, content)
        )
      }
        .map { httpRequest =>
          http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
            .map { httpResponse =>
              println(s"response: $httpResponse")
              // this creates the second Future
              httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
                println("Got response, body: " + body.utf8String)
                // BidOfferConverter.getBidOffer(body.utf8String)
                body.utf8String
              }
            }
        }

Solution

  • The short answer is that you can't, short of blocking in a receive, which is a major no-no.

    This has the feeling of an X:Y question. What are the actual goals here? Is it just that you don't want the response sent until all the requests have completed?

    If that's what you want, then the approach to take is to map a future to transform it to a message which includes the information you need to build a response. With doing this, you may not even need the bidOffer variable.

    Future.sequence will collapse a Seq[Future[A]] (among other collection types) into a Future[Seq[A]] (failing if any of the futures fail: that may not be what you're looking for, in which case other combinators in the Future companion object might be more what you're looking for).