I have this class:
@Slf4j
@RequiredArgsConstructor
@Service
public class SyncTransactionService {
private final SyncProducerService syncProducerService; // kafka producer
private final CouponService couponService; // db persistence service
private final CouponUpdateMessageMapper mapper; // simple mapper to generate message dto for Kafka
public void processChanges(List<Change> changes) {
Flux.fromIterable(changes)
.map(this::processAndSend)
.doOnError(e -> log.error("Cannot sync coupon with change. ", e))
.subscribeOn(Schedulers.elastic())
.subscribe();
}
private Mono<CouponUpdateMessage> processAndSend(Change change) {
return Mono.fromCallable(() -> change)
.doFirst(() -> log.info("saving or deleting the coupon: {}", change.getChanged()))
.map(this::saveOrDelete)
.thenReturn(mapper.map(change))
.doOnSuccess(message -> log.info("sending message: {}", message))
.doOnSuccess(syncProducerService::send);
}
private Mono<Void> saveOrDelete(Change change) {
if (change.getType() == DELETE) return couponService.deleteCoupon(change.getChanged());
else return couponService.saveCoupon(change.getChanged()).then();
}
}
And this test:
@ExtendWith(MockitoExtension.class)
class SyncTransactionServiceTest {
@Mock
private SyncProducerService syncProducerService;
@Mock
private CouponService couponService;
@Mock
private CouponUpdateMessageMapper mapper;
@InjectMocks
private SyncTransactionService syncTransactionService;
private static Coupon insertId1;
private static Coupon updateId2;
private static Coupon deleteId3;
private static Change change1;
private static Change change2;
private static Change change3;
@BeforeAll
private static void prepareData() {
insertId1 = DataHelper.coupon();
updateId2 = DataHelper.coupon();
updateId2.setId(2);
deleteId3 = DataHelper.coupon();
deleteId3.setId(3);
change1 = Change.builder().changed(insertId1).type(CouponUpdateType.INSERT).build();
change2 = Change.builder().changed(updateId2).type(CouponUpdateType.UPDATE).build();
change3 = Change.builder().changed(deleteId3).type(CouponUpdateType.DELETE).build();
}
@Test
void shouldProcessChanges() {
// given
List<Change> changes = List.of(change1, change2, change3);
when(couponService.saveCoupon(insertId1)).thenReturn(Mono.just(insertId1));
when(couponService.saveCoupon(updateId2)).thenReturn(Mono.just(updateId2));
when(couponService.deleteCoupon(deleteId3)).thenReturn(Mono.empty());
doNothing().when(syncProducerService).send(any());
doCallRealMethod().when(mapper).map(any());
// when
syncTransactionService.processChanges(changes);
// then
verify(couponService, times(2)).saveCoupon(any());
verify(mapper, times(3)).map(any());
verify(couponService).deleteCoupon(any());
verify(syncProducerService, times(3)).send(any());
}
}
When I run the test, Mockito.verify()
does not detect any interaction with the mocks, although I have subscribe()
in the code.
So what could be the problem in my pipeline?
The problem is that your method under test runs asynchronously because of the specified scheduler. You should return the Flux from your method under test and then use StepVerifier
or call collectList()
and block()
methods on the Flux to trigger and wait execution.