I'm new to reactive programming. I have to create a API call to search-leave-requests. I have to filter leave request by status(PENDING,APPROVED,REJECTED) and logged user role(HR,AHR,RM) and if role is RM(reporting Manager) I have to query from reporting_manager_id and status. If logged user is HR or AHR only need to query from status.
Controller
@GetMapping(value = "/search-leave-requests")
public Mono<List<ViewLeaveRequestsDto>> searchLeaveRequests(@RequestParam("status") String status,
@RequestParam(value = "page", defaultValue = "1") int page,
@RequestParam(value = "size", defaultValue = "9") int size,
@RequestParam(value = "reporting_manager_id") Optional<Long> reporting_manager_id,
@RequestParam(value = "role") String role) {
long startTime = System.currentTimeMillis();
LOGGER.info("searchLeaveRequestsRequest : status={}", status);
return leaveRequestService.searchLeaveRequests(status, page, size, reporting_manager_id, role).map(
response -> {
LOGGER.info("searchLeaveRequestsResponse : timeTaken={}|response={}",
CommonUtil.getTimeTaken(startTime),
CommonUtil.convertToString(response));
return response;
});
}
Service
public interface LeaveRequestService {
public Mono<List<ViewLeaveRequestsDto>> searchLeaveRequests(String status, int page, int size,
Optional<Long> reporting_manager_id, String role);
ServiceImpl
@Override
public Mono<List<ViewLeaveRequestsDto>> searchLeaveRequests(String status, int page, int size, Optional<Long> reporting_manager_id, String role) {
Flux<LeaveRequest> leaveRequestByStatus = leaveRequestRepository.findByStatus(status)
.switchIfEmpty(
Mono.error(new LmsException(ErrorCode.NO_DATA_FOUND, "No Data matching for given Status")))
// .skip((page - 1) * size)
// .limitRequest(size)
.doOnNext(leaveRequest1 -> {
LOGGER.info("searchLeaveRequestsResponse | {}", leaveRequest1);
});
/* */
// AtomicLong sizeOfFlux = new AtomicLong();
// leaveRequestByStatus.subscribe(object -> sizeOfFlux.incrementAndGet());
List<ViewLeaveRequestsDto> dtoList = new ArrayList<>();
Mono<List<ViewLeaveRequestsDto>> dtoListMono = leaveRequestByStatus.flatMap(entity -> {
// for (int i = 0; i < sizeOfFlux.get(); i++) {
ViewLeaveRequestsDto dto = new ViewLeaveRequestsDto();
dto.setLeave_request_id(entity.getId());
dto.setLeave_request_date(entity.getFromDate());
Mono<LeaveType> leaveType = leaveTypeRepository.findById(entity.getLeaveTypeId());
leaveType.subscribe(s -> dto.setLeave_type(s.getTypeName()));
dto.setFrom_date(entity.getFromDate());
dto.setTo_date(entity.getToDate());
dto.setNumber_of_days(entity.getDaysCount());
dto.setReason(entity.getLeaveReason());
dto.setStatus(entity.getStatus().toString());
dto.setAttachment(entity.getAttachment());
dto.setDay_type(entity.getDateType());
dto.setHalf_day_type(entity.getHalfDayType());
dto.setCovering_employee(entity.getCoveringEmployeeId().toString());
dto.setReporting_manager(entity.getReportingManagerId().toString());
// TODO set profile_image, designation and total_leave_available
dtoList.add(dto);
// }
return Flux.fromIterable(dtoList);
}).collectList();
return dtoListMono;
}
Repository
@Repository
public interface LeaveRequestRepository extends ReactiveCrudRepository<LeaveRequest, Long> {
Flux<LeaveRequest> findByStatus(String status);
Flux<LeaveRequest> findByStatusAndReportingManagerId(String status, Optional<Long> reporting_manager_id);
}
LeaveRequest Entity
@Table(name = "leave_request")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class LeaveRequest {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
@Column("id")
private Long id;
@Column("employee_id")
private Long employeeId;
@Column("date_type")
private Boolean dateType;
@Column("half_day_type")
private Boolean halfDayType;
@Column("leave_request_date")
private LocalDate leaveRequestDate;
@Column("from_date")
private LocalDate fromDate;
@Column("to_date")
private LocalDate toDate;
@Column("leave_reason")
private String leaveReason;
@Column("attachment")
private String attachment;
@Column("days_count")
private Integer daysCount;
@Enumerated(EnumType.STRING)
private Status status;
@Column("reporting_manager_id")
private Long reportingManagerId;
@Column("covering_employee_id")
private Long coveringEmployeeId;
@Column("leave_type_id")
private Long leaveTypeId;
}
ViewLeaveRequestsDto
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ViewLeaveRequestsDto {
private Long leave_request_id;
private String profile_image;
private String designation;
private String requester_name;
private LocalDate leave_request_date;
private String leave_type;
private Boolean day_type;
private Boolean half_day_type;
private String covering_employee;
private LocalDate from_date;
private LocalDate to_date;
private Integer number_of_days;
private String reason;
private Integer total_leave_available;
private String attachment;
private String reporting_manager;
private String status;
}
When I run this code, I got duplicate records. I have only 3 records in my DB. But in postman response I got many records. Any idea How to fix this ?
PS: My java version - 18 spring-boot version - 2.7
It's worth to take into account that you're mixing the imperative-style code with functional-style code here, which is incorrect way to use reactive.
First: you're calling subscribe() inside the method of your service. Avoid this.
Second: you have created List<ViewLeaveRequestsDto> dtoList = new ArrayList<>();
and you're adding elements in there from flatMap() function which is incorrect. flatMap() is async and concurrent by it's nature, so this approach can lead to problems with thread-safety and etc.
Third: why are you using Mono<List<T>>
in your controller if you can return Flux<T>
directly?
Having that said I think your code inside your service should look like:
return leaveRequestRepository.findByStatus(status)
.switchIfEmpty(
Mono.error(new LmsException(ErrorCode.NO_DATA_FOUND, "No Data matching for given Status")))
.doOnNext(leaveRequest1 -> {
LOGGER.info("searchLeaveRequestsResponse | {}", leaveRequest1);
})
.flatMap(entity -> leaveTypeRepository.findById(entity.getLeaveTypeId())
.map(leaveType -> {
// build your DTO using leaveType and fields from entity and return DTO
})
);
And change the signature of your method to return Flux of your DTOs and also change the signature of your controller method to return Flux
UPDATED
Shortly answering why you're getting dupplicates:
Let's say you have 3 rows/documents in your db.
You start streaming them from db with Flux, on each element you call .flatMap() and there you add element (updating your collection every flatMap() call) to your ArrayList. From this flatMap() you return Flux that is made from your ArrayList (updated ArrayList).
This flux "merges" to your "main" flux.
In the resulting list (.collectList()
)you will get:
1 element + 2 elements + 3 elements = 6 elements in summary
So as I said above, you're handling incorrectly with your initial Flux.
UPDATED_2
I think I figured out why you want to return Mono<List<DTO>>
instead of Flux<DTO>
, I noticed that in your controller you call .map()
to measure the total time taken to collect list of your DTOs.
Two things:
If for some reason you want to return Mono<List<DTO>>
, then it's not a problem, in my code (inside your service) just call .collectList()
in the end, so you'll return Mono<List<DTO>>
, but you should know that this approach forces to collect all the results in memory in time.
.map()
operator semantically assumes that you want to map the given object to some another or to change something within that object.
If you want to just log something using this object or do another kind of things (some side effects), then use, for example, doOnNext()
operator