I'm trying to unit test an advice on the poller which blocks execution of the mongo channel adapter until a certain condition is met (=all messages from this batch are processed).
The flow looks as follow:
new Query().with(Sort.by(Sort.Direction.DESC, "modifiedDate")).limit(1))
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds))
.handle((p, h) -> {
return p;
And the following advice bean:
public WaitUntilCompletedAdvice waitUntilCompletedAdvice() {
DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1));
return new WaitUntilCompletedAdvice(trigger);
And the advice itself:
public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice {
AtomicBoolean wait = new AtomicBoolean(false);
public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) {
public boolean beforeReceive(MessageSource<?> source) {
if (getWait())
return false;
return true;
public boolean getWait() {
return wait.get();
public void setWait(boolean newWait) {
if (getWait() == newWait)
while (true) {
if (wait.compareAndSet(!newWait, newWait)) {
I'm using the following test for testing the flow:
public void testClaimPoollingAdapterFlow() throws Exception {
// given
ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
CountDownLatch receiveLatch = new CountDownLatch(1);
MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader", mockMessageHandler);
LocalDateTime modifiedDate = LocalDateTime.now();
ProcessingMetadata data = Metadata.builder()
assert !this.advices.waitUntilCompletedAdvice().getWait();
// when
itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));
// then
assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue();
assert this.advices.waitUntilCompletedAdvice().getWait();
Which works fine but when I send another message to the input channel, it still processes the message without respecting the advice.
Is it intended behaviour? If so, how can I verify using unit test that the poller is really waiting for this advice?
itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));
That bypasses the poller and sends the message directly.
You can unit test the advice has been configured by calling beforeReceive()
from your test
Or you can create a dummy test flow with the same advice
IntegationFlows.from(() -> "foo", e -> e.poller(...))
And verify that just one message is sent.