Search code examples
javaspring-bootspring-webfluxproject-reactorflux

Subscribe to flux from inside subscribe in Spring webFlux java


I have written a logic using spring reactor library to get all operators and then all devices for each operator (paginated) in async mode.

Created a flux to get all operator and then subscribing to it.

    final Flux<List<OperatorDetails>> operatorDetailsFlux = reactiveResourceProvider.getOperators();
    operatorDetailsFlux
        .subscribe(operatorDetailsList -> {
          for (final OperatorDetails operatorDetails : operatorDetailsList) {
            getAndCacheDevicesForOperator(operatorDetails.getId());
          }
        });

Now, for each operator I'm fetching the devices which requires multiple subscriptions to get device mono which gets all pages async by subscribing to the MONO.

private void getAndCacheDevicesForOperator(final int operatorId) {
    Mono<DeviceListResponseEntity> deviceListResponseEntityMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
        operatorId, 0);

    deviceListResponseEntityMono.subscribe(deviceListResponseEntity -> {
      final PaginatedResponseEntity PaginatedResponseEntity = deviceListResponseEntity.getData();
      final long totalDevicesInOperator = PaginatedResponseEntity.getTotalCount();


      int deviceCount = PaginatedResponseEntity.getCount();
      while (deviceCount < totalDevicesInOperator) {
        final Mono<DeviceListResponseEntity> deviceListResponseEntityPageMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
            operatorId, deviceCount);

        deviceListResponseEntityPageMono.subscribe(deviceListResponseEntityPage -> {
          final List<DeviceDetails> deviceDetailsList = deviceListResponseEntityPage.getData()
              .getItems();
          // work on devices
        });

        deviceCount += DEVICE_PAGE_SIZE;
      }
    });
  }

This code works fine. But my question is it a good idea to subscribe to mono from inside subscribe?


Solution

  • I broke it down to two flows 1st getting all operators and then getting all devices for each operator.

    For pagination I'm using Flux.expand to extract all pages.

    public Flux<OperatorDetails> getAllOperators() {
      return getOperatorsMonoWithRetryAndErrorSpec(0)
          .expand(paginatedResponse -> {
            final PaginatedEntity operatorDetailsPage = paginatedResponse.getData();
            if (morePagesAvailable(operatorDetailsPage) {
              return getOperatorsMonoWithRetryAndErrorSpec(operatorDetailsPage.getOffset() + operatorDetailsPage.getCount());
            }
            return Mono.empty();
          })
          .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
          .subscribeOn(apiScheduler);
    }
    
    
    public Flux<Device> getAllDevices(final int opId, final int offset) {
      return getConnectedDeviceMonoWithRetryAndErrorSpec(opId, offset)
          .expand(paginatedResponse -> {
            final PaginatedEntity deviceDetailsPage = paginatedResponse.getData();
            if (morePagesAvailabile(deviceDetailsPage)) {
              return getConnectedDeviceMonoWithRetryAndErrorSpec(opId,
                  deviceDetailsPage.getOffset() + deviceDetailsPage.getCount());
            }
            return Mono.empty();
          })
          .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
          .subscribeOn(apiScheduler);
    }
    
    

    Finally I'm creating a pipeline and subscribing to it to trigger the pipeline.

    operatorDetailsFlux
        .flatMap(operatorDetails -> {
            return reactiveResourceProvider.getAllDevices(operatorDetails.getId(), 0);
        })
        .subscribe(deviceDetails -> {
          // act on devices
        });