I have an Actor that sends a HTTP POST request using httpRequest => http.singleRequest(httpRequest).pipeTo(self)
in one case BidRequest
message. The the actor receives back the httpResponse in another case HttpResponse
message. In this second case HttpResponse
message I want to change a variable which the first case BidRequest
message will send back. Because the messages are handled asynchronously, when I edit the variable on the second message, the first message already send back the variable with the old state.
I think I need to use the akka.pattern.ask
in some way to not let the message arrive on another case HttpResponse
, but stay in the same case BidRequest
so I can edit the variable in place.
object AuctionClientActor {
def props(bidders: List[String]) = { Props(new AuctionClientActor(bidders)) }
}
class AuctionClientActor(bidders: List[String])
extends Actor with ActorLogging
with BidJsonProtocol with SprayJsonSupport {
import context.dispatcher
implicit val system = context.system
val http = Http(system)
var bidOffer: BidOffer = BidOffer("", 0, "")
def receive = {
case bidRequest@BidRequest(requestId, bid) =>
val content = bidRequest.bid.toJson.toString
val latch = new CountDownLatch(bidders.size)
val listResponseFuture: List[Future[HttpResponse]] = bidders
.map(bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
)
// IF I USE pipeTo HERE THE HttpResponse WILL GO TO ANOTHER CASE
.map(httpRequest => http.singleRequest(httpRequest).pipeTo(self)) // send the request
listResponseFuture.foreach { response =>
Await.result(response, 3 seconds)
response.onComplete {
case Success(value) => latch.countDown // println(s"response success: $value")
case Failure(exception) =>
println(s"response failure: $exception")
latch.countDown
}
}
latch.await(3, TimeUnit.SECONDS)
println("sending response now... BUT bidOffer WAS EDITED IN ANOTHER case thread")
sender() ! Some(bidOffer.content)
bidOffer = BidOffer("", 0, "")
case resp@HttpResponse(StatusCodes.OK, headers, entity, _) =>
log.info(s"received HttpResponse OK(200): $resp")
entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
println("Got response, body: " + body.utf8String)
val newBidOffer = BidOfferConverter.getBidOffer(body.utf8String)
// I SHOULD NOT EDIT bidOffer HERE. INSTEAD I NEED TO EDIT bidOffer ON THE case BidRequest
if (bidOffer.bid == 0) {
println("new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else if (newBidOffer.bid > bidOffer.bid) {
println("replace new")
bidOffer = BidOffer(newBidOffer.id, newBidOffer.bid, newBidOffer.content.replace("$price$", newBidOffer.bid.toString))
} else {
println("none")
}
}
case resp@HttpResponse(code, _, _, _) =>
log.info(s"Request failed, response code: $code")
resp.discardEntityBytes()
}
}
I was looking at this answer to transform a List[Future]
to Future[List]
, but when I do that I create a Future[List[Any]]
and not a HttpResponse
anymore.
Next code piece: So I tried to do the way you said but I am creating a List[Future[Future[String]]]
. If I have only one host to do the request it is easy. But because I can have 1, 2, or 3 requests I create a list and the code get complicated. Plus the runFold
from akka-stream
creates another Future
. Could you give a hint how to implement it in the way that you said?
val responseListFuture: List[Future[Future[String]]] = bidders.map { bidder =>
HttpRequest( // create the request
HttpMethods.POST,
uri = Uri(bidder), // uri = Uri("http://localhost:8081 | 8082 | 8083"),
entity = HttpEntity(ContentTypes.`application/json`, content)
)
}
.map { httpRequest =>
http.singleRequest(httpRequest).pipeTo(self) // this creates the first Future
.map { httpResponse =>
println(s"response: $httpResponse")
// this creates the second Future
httpResponse.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map { body =>
println("Got response, body: " + body.utf8String)
// BidOfferConverter.getBidOffer(body.utf8String)
body.utf8String
}
}
}
The short answer is that you can't, short of blocking in a receive
, which is a major no-no.
This has the feeling of an X:Y question. What are the actual goals here? Is it just that you don't want the response sent until all the requests have completed?
If that's what you want, then the approach to take is to map
a future to transform it to a message which includes the information you need to build a response. With doing this, you may not even need the bidOffer
variable.
Future.sequence
will collapse a Seq[Future[A]]
(among other collection types) into a Future[Seq[A]]
(failing if any of the futures fail: that may not be what you're looking for, in which case other combinators in the Future
companion object might be more what you're looking for).