Search code examples
reactive-programmingspring-webfluxproject-reactor

Background task in reactive pipeline (Fire-and-forget)


I have a reactive pipeline to process incoming requests. For each request I need to call a business-relevant function (doSomeRelevantProcessing).

After that is done, I need to notify some external service about what happened. That part of the pipeline should not increase the overall response time. Also, notifying this external system is not business critical: giving a quick response after the main part of the pipeline is finished is more important than making sure the notification is successful.

As far as I learned, the only way to run something in the background without slowing down the overall process is to subscribe to in directly in the pipeline, thus achieving a fire-and-forget mentality.

Is there a good alternative to subscribing inside the flatmap? I am a little worried about what might happen if notifying the external service takes longer than the original processing and a lot of requests are coming in at once. Could this lead to a memory exhaustion or the overall process to block?

fun runPipeline(incoming: Mono<Request>) = incoming
    .flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
    .flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical

fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing

fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
    val notification = "notification" // build an object from context

    // this uses non-blocking HTTP (i.e. webclient), so it can take a second or so 
    notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()

    Mono.just(Unit)
}

fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while


Solution

  • I'm answering this assuming that you notify the external service using purely reactive mechanisms - i.e. you're not wrapping a blocking service. If you are then the answer would be different as you're bound by the size of your bounded elastic thread pool, which could quickly become overwhelmed if you have hundreds of requests a second incoming.

    (Assuming you're using reactive mechanisms, then there's no need for .subscribeOn(Schedulers.boundedElastic()) as you give in your example, as that's not buying you anything - it's designed for wrapping legacy blocking services.)

    Could this lead to a memory exhaustion

    It's only a possibility in really extreme cases, the memory used by each individual request will be tiny. It's almost certainly not worth worrying about, if you start seeing memory issues here then you'll almost certainly be hit by other issues elsewhere.

    That being said, I'd probably recommend adding .timeout(Duration.ofSeconds(5)) or similar before your inner subscribe method to make sure the requests are killed off after a while if they haven't worked for any reason - this will prevent them building up.

    ...or [can this cause] the overall process to block?

    This one is easier - a short no, it can't.