Search code examples

Mutiny - Propagate completion to parent multi / polling

I am writing a little polling mechanism using Mutiny, part of me learning the library and i am kinda stuck in cancelling the polling when result is found. I tried using the tick() and what i came up with looks like

    .onItem().transformToMultiAndMerge(tick -> {
      System.out.println("Tick:" + tick);
      return Multi.createFrom()
              emitter -> {
                        transactions -> Multi.createFrom().iterable(transactions))
                    .subscribe().with(transaction -> {
                      if (!verification.isOngoing()) {
                      } else {
                        boolean transactionFound = transaction.getAmount().stream().anyMatch(
                            amount -> amount.getQuantity()
                        if (transactionFound) {
    .with(transaction ->,
        x -> x.printStackTrace());

Problem here is that the Multi from ticks() is running forever and the only way i think of to cancel it would be to propagate somehow that the emitter has completed. The case here is that i want to emit, and process only if certain conditions are met.


  • You approach is almost correct, though,

    • there is no need to create a custom MultiEmitter out of an existing Multi (or transformed Uni) as you can leverage the different Multi operators on top of your source service#getTransaction result
    • you missed the EmptyMulti source which will automatically complete downstream subscriber chain and which you can use to signal an absence of valid item (i.e. Transaction)
    • you need to select the first valid item (being non-null) then transform your Multi to Uni which will result in the upstream subscription being cancelled automatically once an item is received

    Here down what the stream pipeline would look like:

            // flat map the ticks to the `service#getTransactions` result
            .transformToMultiAndMerge(tick -> service.getTransactions()
                    // flatten Collection<Transaction> to Multi<Transaction>
                    .transformToMultiAndMerge(transaction -> {
                        if (!verification.isOngoing()) {
                            return Multi.createFrom().failure(new TransactionVerificationException());
                        } else {
                            boolean transactionFound = transaction.getAmount()
                                    .anyMatch(amount -> amount.getQuantity().equals("test"));
                            if (transactionFound) {
                                return Multi.createFrom().item(transaction);
                            } else {
                                return Multi.createFrom().empty();
            .with(transaction ->, x -> x.printStackTrace());