Search code examples
javarx-javareactive-programmingspring-webfluxreactor

How to push message to upstream using reactive Flux/Mono whenever they are ready than polling in interval for status?


Trying to push message to upstream whenever they are available/ready and close connection after flush, rather than polling for message using spring reactive flux interval.

@GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {

return Flux.<String>interval(Duration.ofSeconds(3))
                .map(status -> {
                    if (getSomething(randomId).
                            equalsIgnoreCase("value"))
                        return "value";
                    return "ping";
                }).take(Duration.ofSeconds(60)).timeout(Duration.ofSeconds(60));
    }

Kafka listener updates randomId value in a map as it gets, getSomething method checks for randomId value in intervals in map. So instead of checking in intervals and storing data in map, i want to push message to client when listener receives.


Solution

  • I built solution based on this stackoverflow Spring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event stream answer, used EmitterProcessor to hot publish message as they are available.

    Here is the sample code

    @GetMapping(value = "/getValue/{randomId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> statusCheck(@PathVariable("randomId") @NonNull String randomId) {
        EmitterProcessor<String> emitterProcessor = EmitterProcessor.create();
        Flux<String> autoConnect = emitterProcessor.publish().autoConnect();
        FluxSink<String> sink = emitterProcessor.sink();
        //storing randomId and processor sink details
        randomIdMap.putIfAbsent(randomId, emitterProcessor);
        /** This will return ping status to notify client as 
        connection is alive until the randomId message received. **/
        sendPingStatus(sink, randomId);
    }
    

    Below method shows how to pushes message to client as it arrives on kafka consumer and close flux connection.

    @KafkaListener(topics = "some-subscription-id",
            containerFactory = "kafkaListenerContainerFactory")
    public void pushMessage(SomeMessage message, Acknowledgment acknowledgment) {
        EmitterProcessor emitter = randomIdMap.get("randomId");
        if (emitter != null ) {
            emitter.onNext(message);
            emitter.onComplete();
            randomIdMap.remove("randomId");
            acknowledgment.acknowledge();
        }
    }