Search code examples
spring-bootmockitoreactive-programmingspring-webflux

Reactive pipeline subscribed but in unit test Mockito.verify() fails (no invocation of mocks is recorded)


I have this class:

@Slf4j
@RequiredArgsConstructor
@Service
public class SyncTransactionService {
    private final SyncProducerService syncProducerService; // kafka producer
    private final CouponService couponService; // db persistence service
    private final CouponUpdateMessageMapper mapper; // simple mapper to generate message dto for Kafka

    public void processChanges(List<Change> changes) {
        Flux.fromIterable(changes)
                    .map(this::processAndSend)
                    .doOnError(e -> log.error("Cannot sync coupon with change. ", e))
                    .subscribeOn(Schedulers.elastic())
                    .subscribe();
    }

    private Mono<CouponUpdateMessage> processAndSend(Change change) {
        return Mono.fromCallable(() -> change)
                .doFirst(() -> log.info("saving or deleting the coupon: {}", change.getChanged()))
                .map(this::saveOrDelete)
                .thenReturn(mapper.map(change))
                .doOnSuccess(message -> log.info("sending message: {}", message))
                .doOnSuccess(syncProducerService::send);
    }

    private Mono<Void> saveOrDelete(Change change) {
        if (change.getType() == DELETE) return couponService.deleteCoupon(change.getChanged());
        else return couponService.saveCoupon(change.getChanged()).then();
    }

}

And this test:

@ExtendWith(MockitoExtension.class)
class SyncTransactionServiceTest {
    @Mock
    private SyncProducerService syncProducerService;

    @Mock
    private CouponService couponService;

    @Mock
    private CouponUpdateMessageMapper mapper;

    @InjectMocks
    private SyncTransactionService syncTransactionService;


    private static Coupon insertId1;
    private static Coupon updateId2;
    private static Coupon deleteId3;

    private static Change change1;
    private static Change change2;
    private static Change change3;


    @BeforeAll
    private static void prepareData() {
        insertId1 = DataHelper.coupon();
        updateId2 = DataHelper.coupon();
        updateId2.setId(2);
        deleteId3 = DataHelper.coupon();
        deleteId3.setId(3);

        change1 = Change.builder().changed(insertId1).type(CouponUpdateType.INSERT).build();
        change2 = Change.builder().changed(updateId2).type(CouponUpdateType.UPDATE).build();
        change3 = Change.builder().changed(deleteId3).type(CouponUpdateType.DELETE).build();
    }

    @Test
    void shouldProcessChanges() {
        // given
        List<Change> changes = List.of(change1, change2, change3);
        when(couponService.saveCoupon(insertId1)).thenReturn(Mono.just(insertId1));
        when(couponService.saveCoupon(updateId2)).thenReturn(Mono.just(updateId2));
        when(couponService.deleteCoupon(deleteId3)).thenReturn(Mono.empty());
        doNothing().when(syncProducerService).send(any());
        doCallRealMethod().when(mapper).map(any());

        // when
        syncTransactionService.processChanges(changes);

        // then
        verify(couponService, times(2)).saveCoupon(any());
        verify(mapper, times(3)).map(any());
        verify(couponService).deleteCoupon(any());
        verify(syncProducerService, times(3)).send(any());
    }
}

When I run the test, Mockito.verify() does not detect any interaction with the mocks, although I have subscribe() in the code.

So what could be the problem in my pipeline?


Solution

  • The problem is that your method under test runs asynchronously because of the specified scheduler. You should return the Flux from your method under test and then use StepVerifier or call collectList() and block() methods on the Flux to trigger and wait execution.