Search code examples
javaredisquarkussmallrye

Is recursion necessary in Quarkus/Mutiny?


I'm trying to use Redis' scan feature which needs to called until the cursor returned is "0."

This approach works but feels overly complicated - is there a way to do it without recursion?

Uni<Object> callScanRecursively(String cursorNum, List<String> results) {
  List<String> args = new ArrayList<>(List.of(cursorNum, "MATCH", "*"));
  return reactiveRedisClient.scan(args)
      .onItem()
      .transformToUni(item -> {
            List<String> keysList = new ArrayList<>();
            item.get(1).forEach(k -> {
              keysList.add(k.toString());
            });
            results.addAll(keysList);

            if (item.get(0).toString().equals("0")) {
              return Uni.createFrom().item(results);
            } else {
              String nextCursorNum = item.get(0).toString();
              return callScanRecursively(nextCursorNum, results);
            }
          }
      );
}

List<String> results = new ArrayList<>();
String startingCursorRedis = "0";
callScanRecursively(startingCursorRedis, results)
    .subscribe()
    .with(item ->
        LOGGER.info(item)
    );

Solution

  • You can use a MultiRepeat collecting the ReactiveRedisClient#scan results through a Uni emitter while a given condition, response.get(0).toString().equals("0") is still not met:

    Uni Emitter

    Multi<String> callScan(String initialCursor) {
        final AtomicReference<String> cursorReference = new AtomicReference<>(initialCursor);
        return Multi.createBy()
                .repeating()
                .<AtomicReference<String>, List<String>>uni(
                        () -> cursorReference,
                        (state, emitter) -> reactiveRedisClient.scan(List.of(state.get(), "MATCH", "*"))
                                .subscribe()
                                .with(response -> {
                                    if (response.get(0).toString().equals("0")) {
                                        cursorReference.set(null);
                                    } else {
                                        state.set(response.get(0).toString());
                                    }
                                    java.util.List<String> keysList = new ArrayList<>();
                                    response.get(1).forEach(k -> keysList.add(k.toString()));
                                    emitter.complete(keysList);
                                })
                )
                .whilst(o -> cursorReference.get() != null)
                .flatMap(l -> Multi.createFrom().iterable(l));
    }
    

    Uni

    Or even simpler if you don't need more control around the emitted items:

    Multi<String> callScan(String initialCursor) {
        ReactiveRedisClientImpl reactiveRedisClient = new ReactiveRedisClientImpl();
        final AtomicReference<String> cursorReference = new AtomicReference<>(initialCursor);
        return Multi.createBy()
                .repeating()
                .uni(
                        () -> cursorReference,
                        (state) -> reactiveRedisClient.scan(List.of(state.get(), "MATCH", "*"))
                                .invoke(response -> {
                                    if (response.get(0).toString().equals("0")) {
                                        cursorReference.set(null);
                                    } else {
                                        state.set(response.get(0).toString());
                                    }
                                })
                                .map(response -> {
                                    List<String> keysList = new ArrayList<>();
                                    response.get(1).forEach(k -> keysList.add(k.toString()));
                                    return keysList;
                                })
                )
                .whilst(o -> cursorReference.get() != null)
                .flatMap(l -> Multi.createFrom().iterable(l));
    }