Search code examples
spring-webfluxreactorspring5

Reactor framework confusion with Assembly time and subscription time (when to call subscribe)


I'm actually confused on assembly time and subscription time. I know mono's are lazy and does not get executed until its subscribed. Below is a method.

    public Mono<UserbaseEntityResponse> getUserbaseDetailsForEntityId(String id) {
        GroupRequest request = ImmutableGroupRequest
                .builder()
                .cloudId(id)
                .build();

        Mono<List<GroupResponse>> response = ussClient.getGroups(request);
        List<UserbaseEntityResponse.GroupPrincipal> groups = new CopyOnWriteArrayList<>();


        response.flatMapIterable(elem -> elem)
                .toIterable().iterator().forEachRemaining(
                    groupResponse -> {
                        groupResponse.getResources().iterator().forEachRemaining(
                            resource -> {
                                groups.add(ImmutableGroupPrincipal
                                        .builder()
                                        .groupId(resource.getId())
                                        .name(resource.getDisplayName())
                                        .addAllUsers(convertMemebersToUsers(resource))
                                        .build());
                            }
                        );
                    }
                );
        log.debug("Response Object - " + groups.toString());
        ImmutableUserbaseEntityResponse res = ImmutableUserbaseEntityResponse
                .builder()
                .userbaseId(id)
                .addAllGroups(groups)
                .build();


        Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
        .parallel()
                .runOn(Schedulers.parallel())
                .doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
                .sequential();

        return Mono.just(res);
    }

This gets executed Mono<List<GroupResponse>> response = ussClient.getGroups(request); without calling subscribe, however below will not get executed unless I call subscribe on that.

Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
        .parallel()
                .runOn(Schedulers.parallel())
                .doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
                .sequential();

Can I get some more input on assembly time vs subscription?


Solution

  • "Nothing happens until you subscribe" isn't quite true in all cases. There's three scenarios in which a publisher (Mono or Flux) will be executed:

    • You subscribe;
    • You block;
    • The publisher is "hot".

    Note that the above scenarios all apply to an entire reactive chain - i.e. if I subscribe to a publisher, everything upstream (dependent on that publisher) also executes. That's why frameworks can, and should call subscribe when they need to, causing the reactive chain defined in a controller to execute.

    In your case it's actually the second of these - you're blocking, which is essentially a "subscribe and wait for the result(s)". Usually the methods that block are clearly labelled, but again that's not always the case - in your case it's the toIterable() method on Flux doing the blocking:

    Transform this Flux into a lazy Iterable blocking on Iterator.next() calls.

    But ah, you say, I'm not calling Iterator.next() - what gives?!

    Well, implicitly you are by calling forEachRemaining():

    The default implementation behaves as if:

     while (hasNext())
         action.accept(next());
    

    ...and as per the above rule, since ussClient.getGroups(request) is upstream of this blocking call, it gets executed.