I have a Spring webflux app with the below method.
@Override
public Mono<Integer> updateSetting(int orgId, IntegrationDto dto,
Map<String, Object> jsonMap) {
return retrieveServices(dto.getClientId()).flatMap(services -> {
jsonMap.put("service", services);
return categoryRepository.findCategoryIdCountByName("test", orgId)
.flatMap(categoryIdCount -> {
final ServiceDto serviceInput = new ServiceDto();
if (categoryIdCount == 0) {
return inventoryCategoryRepository.save(InventoryCategory.of("test", orgId))
.flatMap(category -> {
return saveServices(serviceInput, orgId, jsonMap,
category.getCategoryId());
});
} else {
// Some Logc here ...
}
});
}).onErrorResume(e -> {
if (e instanceof WebClientResponseException) {
int statusCode = ((WebClientResponseException) e).getRawStatusCode();
throw new LabServiceException("Unable to connect to the service !", statusCode);
}
throw new ServiceException("Error connecting to the service !");
});
}
private Mono<Services> retrieveServices(final String clientId) {
return webClient.get().uri(props.getBaseUrl() + "/api/v1/services")
.retrieve().bodyToMono(Services.class);
}
private Mono<Integer> saveInventories(ServiceInput serviceInput, int orgId, Map<String, Object> jsonMap,
Long categoryId) {
return refreshInventories(serviceInput, orgId, categoryId).flatMap(reponse -> {
return updateSetting(branchId, jsonMap);
});
}
private Mono<Integer> refreshInventories(ServiceInput serviceInput, int orgId, Long categoryId) {
return inventoryRepository.findAllCodesByTypeBranchId(branchId).collectList().flatMap(codes -> {
return retrieveAvailableServices(Optional.of(serviceInput), categoryId).flatMap(services -> {
List<Inventory> inventories = services.stream()
.filter(inventory -> !codes.contains(inventory.getCode()))
.map(inventoryDto -> toInventory(inventoryDto, branchId)).collect(Collectors.toList());
if (inventories.size() > 0) {
return saveAllInventories(inventories).flatMap(response -> {
return Mono.just(orgId);
});
} else {
return Mono.just(orgId);
}
});
});
}
Here, the updateSettig public method is being invoked from a REST call and all gets executed as expected.
Now, I want to execute the same with a different flow as well like a scheduler.
When I invoke from a scheduler also, It works.
updateSetting(orgId, dto, jsonMap).subscribe();
But, I want to wait until the updateSetting gets executed.
So, tried with the code below.
updateSetting(orgId, dto, jsonMap).flatMap(response -> {
////
});
With the above code, updateSetting method gets invoked, but not getting into the retrieveServices.
return retrieveServices(dto.getClientId()).flatMap(services -> {
You always need to subscribe in the end. So your code should be:
updateSetting(orgId, dto, jsonMap).flatMap(response -> {
////
}).subscribe();