Search code examples
javaquarkusmutiny

How to Reactively Emit Events with Dynamic Delay using Mutiny in Java?


How can I use Mutiny to periodically poll for new events from a data resource, while delaying the emission of each event by a dynamic amount of time such that the event's date property is exactly one second ago?

Each item in the data resource contains a date property that represents the time of entry, and the goal is to emit each event one second after its entry time. I am considering using a Uni<List> (database queries are "oneshot" operations), but I would prefer Multi to combine streams in the parent class.

private LocalDateTime timeOfLastQuery;

public Multi<Event> getNewEvents() {
        final var eventStream = Multi.createFrom().iterable(getNewEvents(timeOfLastQuery))
                //todo delay item by millisToOneSecondOld
                .onItem().transform(mapper::toEvent);

        timeOfLastQuery = LocalDateTime.now();

        return eventStream;
    }

    private Long millisToOneSecondOld(LocalDateTime timeOfEntry) {
        final var aSecondAgo = LocalDateTime.now().minusSeconds(1);
        final var duration = Duration.between(aSecondAgo, timeOfEntry);
        return coerceAtLeast(duration.toMillis(),0);
    }

    public static long coerceAtLeast(long x, long minimum) {
        return Math.max(x, minimum);
    }

Solution

  • You can implement your delay as follows:

    .call(ignored -> Uni.createFrom().nullItem().onItem().delayIt().by(getDuration()))
    

    When you get the item, it subscribes to the Uni I created and waits for the item to be emitted before propagating the original item downstream. The trick is to delay the emission of the null item.