I have written a logic using spring reactor library to get all operators and then all devices for each operator (paginated) in async mode.
Created a flux to get all operator and then subscribing to it.
final Flux<List<OperatorDetails>> operatorDetailsFlux = reactiveResourceProvider.getOperators();
operatorDetailsFlux
.subscribe(operatorDetailsList -> {
for (final OperatorDetails operatorDetails : operatorDetailsList) {
getAndCacheDevicesForOperator(operatorDetails.getId());
}
});
Now, for each operator I'm fetching the devices which requires multiple subscriptions to get device mono which gets all pages async by subscribing to the MONO.
private void getAndCacheDevicesForOperator(final int operatorId) {
Mono<DeviceListResponseEntity> deviceListResponseEntityMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
operatorId, 0);
deviceListResponseEntityMono.subscribe(deviceListResponseEntity -> {
final PaginatedResponseEntity PaginatedResponseEntity = deviceListResponseEntity.getData();
final long totalDevicesInOperator = PaginatedResponseEntity.getTotalCount();
int deviceCount = PaginatedResponseEntity.getCount();
while (deviceCount < totalDevicesInOperator) {
final Mono<DeviceListResponseEntity> deviceListResponseEntityPageMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
operatorId, deviceCount);
deviceListResponseEntityPageMono.subscribe(deviceListResponseEntityPage -> {
final List<DeviceDetails> deviceDetailsList = deviceListResponseEntityPage.getData()
.getItems();
// work on devices
});
deviceCount += DEVICE_PAGE_SIZE;
}
});
}
This code works fine. But my question is it a good idea to subscribe to mono from inside subscribe?
I broke it down to two flows 1st getting all operators and then getting all devices for each operator.
For pagination I'm using Flux.expand
to extract all pages.
public Flux<OperatorDetails> getAllOperators() {
return getOperatorsMonoWithRetryAndErrorSpec(0)
.expand(paginatedResponse -> {
final PaginatedEntity operatorDetailsPage = paginatedResponse.getData();
if (morePagesAvailable(operatorDetailsPage) {
return getOperatorsMonoWithRetryAndErrorSpec(operatorDetailsPage.getOffset() + operatorDetailsPage.getCount());
}
return Mono.empty();
})
.flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
.subscribeOn(apiScheduler);
}
public Flux<Device> getAllDevices(final int opId, final int offset) {
return getConnectedDeviceMonoWithRetryAndErrorSpec(opId, offset)
.expand(paginatedResponse -> {
final PaginatedEntity deviceDetailsPage = paginatedResponse.getData();
if (morePagesAvailabile(deviceDetailsPage)) {
return getConnectedDeviceMonoWithRetryAndErrorSpec(opId,
deviceDetailsPage.getOffset() + deviceDetailsPage.getCount());
}
return Mono.empty();
})
.flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
.subscribeOn(apiScheduler);
}
Finally I'm creating a pipeline and subscribing to it to trigger the pipeline.
operatorDetailsFlux
.flatMap(operatorDetails -> {
return reactiveResourceProvider.getAllDevices(operatorDetails.getId(), 0);
})
.subscribe(deviceDetails -> {
// act on devices
});