I'm implementing a tipical use case in which a client asks for a resource that will be asynchronously generated. Thus, a resourceID is generated and returned right away:
1. CLIENT ---(POST /request-resource)---> SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT
At this point there is a background task in the SERVER, that will eventually produce a result and store it in a database associated to the resID. The CLIENT would be asking for the resource periodically, retrying until it is available:
3. CLIENT ---(/resource/resID)---> SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)---> SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT
I though RSocket would be a perfect fit in order to avoid this endless CLIENT retry until the resource is available (steps 3. on).
Which interaction model would be more suitable for this problem and how could I implement it?
Consider a repository as follows: ResourceRepository.Mono<Result> getResult(String resID)
If I chose a request/response interaction model I'd be in the same case as before. Unless there was a way to have a Mono that retried until there is a result. Is this possible?
With request/stream I could return results like Flux<Response>
with response.status=PROCESSING until the query to Postgre returned a result, then the Flux would have an element with response.status=OK and the Flux would complete. A maximum time would be needed to finish the Flux without a result in a configured period. In this case how could I orquestate this?
I would need to create a Flux, that emits periodically (with a max period timeout), having an element with no result when the repository returns an empty Mono, or the actual value when te repository has it, completing the Flux.
Solution to this problem using RSocket with a RequestResponse interaction model that waits until the resource is available in DB. The key was to use the repeatWhenEmpty
operator:
@MessageMapping("request-resource")
fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
return resourceService.sendResourceRequestProcessing(resourceRequest)
}
override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
val resourceId = randomUUID().toString()
return Mono.fromCallable {
sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
}.then(poolResourceResponse(resourceId))
}
private fun poolResourceResponse(resourceId: String): Mono<Resource> {
return resourceRepository.findByResourceId(resourceId)
.repeatWhenEmpty(30) { longFlux ->
longFlux.delayElements(Duration.ofSeconds(1))
.doOnNext { logger.info("Repeating {}", it) }
}
}