Search code examples
javaspring-integrationspring-amqp

Spring Integration - Poller fires too often


I have following code:

@Bean
  public IntegrationFlow aggregatingFlow(
      AmqpInboundChannelAdapter aggregatorInboundAdapter,
      PollableChannel aggregatingChannel,
      AmqpOutboundEndpointEnhanced amqpOutboundEndpoint,
      PollSkipAdvice pollSkipAdvice) {
    return IntegrationFlows.from(aggregatorInboundAdapter)
        .wireTap(wtChannel())
        .channel(aggregatingChannel)
        .handle(
            amqpOutboundEndpoint,
            e ->
                e.poller(
                        Pollers.fixedDelay(1, TimeUnit.SECONDS, 1)
                            .maxMessagesPerPoll(DEFAULT_MESSAGES_PER_POLL)
                            .advice(pollSkipAdvice))
                    .id("pollingConsumer"))
        .get();
  }

where this pollSkipAdvice is defined as

@Bean
  public PollSkipAdvice pollSkipAdvice() {
    return new PollSkipAdvice(
        new PollSkipStrategy() {
          int currentPoll = 1;

          @Override
          public synchronized boolean skipPoll() {
            int hitRate = 0; //just not to add to the code; its dynamic
            if (currentPoll >= hitRate) {
              System.out.println(MessageFormat.format("{0} : Hit poll number {1} for message number of {2} | {3}",
                  DateTimeFormatter.ofPattern("HH:mm:ss. SSS")
                      .withZone(ZoneOffset.UTC)
                      .format(Instant.now()),
                  currentPoll,
                  currentMessagesInQueue, Thread.currentThread().getName()));
              currentPoll = 1;
              return false;
            }
            currentPoll++;
            return true;
          }
        });
  }

The problem that I facing is that Poller is invoked more than once per second. This is the log that I get when testing my code:

21:41:47. 652 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 654 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=190)
21:41:47. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=191)
21:41:47. 658 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 660 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=192)
21:41:47. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=193)
21:41:47. 665 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=194)
21:41:47. 667 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=195)
21:41:47. 669 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:47. 671 : Hit poll number 1 for message number of 0 | pool-3-thread-1
EventChange(id=196)
EventChange(id=197)
EventChange(id=198)
EventChange(id=199)
21:41:48. 653 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:49. 656 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:50. 659 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:51. 661 : Hit poll number 1 for message number of 0 | pool-3-thread-1
21:41:52. 662 : Hit poll number 1 for message number of 0 | pool-3-thread-1

So seems that whenever I receive something from AMQP, this Poller will trigger many times. You can see that in logs as 21:41:47. 652 to 21:41:47. 671 triggered 10 times in period of 1 second. On the other hand, as soon as it is over - it works as it should (once per second).

When I try to print stack trace for these 2 occasions when it is only few millis of distance I get this:

20:32:56. 406 : Hit poll number 2 for message number of 0 | pool-3-thread-1

java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig$1.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

20:32:56. 408 : Hit poll number 1 for message number of 0 | pool-3-thread-1

java.lang.Thread.getStackTrace(Thread.java:1559)
com.a.configuration.component.EventAggregatorConfig$1.skipPoll(EventAggregatorConfig.java:313)
org.springframework.integration.scheduling.PollSkipAdvice.invoke(PollSkipAdvice.java:51)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
com.sun.proxy.$Proxy136.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

Did I configure it in a wrong way? I tried to define separate taskScheduler, but it seems it is not using that one at all.

UPDATE

Possibly after a bit of analysis is it possible that it is due to this part in spring AbstractPollingEndpoint class:

while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                        if (pollForMessage() == null) {
                            break;
                        }
                        count++;
                    }

so basically even if I say poll every 1 second, and take 10 messages if there; but do nothing if there are no messages - it will keep try polling if it finds nothing?

UPDATE2

Yes, it seems that skipPoll advice is called for every message poll and not once for maxMessagesPerPoll.


Solution

  • I'm not sure what is the question. The logic is definitely whatever we would expect from this advice:

    private Runnable createPoller() {
        return () ->
                this.taskExecutor.execute(() -> {
                    int count = 0;
                    while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                        if (this.maxMessagesPerPoll == 0) {
                            logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                            break;
                        }
                        if (pollForMessage() == null) {
                            break;
                        }
                        count++;
                    }
                });
    }
    

    So, if pollForMessage() returns null we just break from that while() loop and exit from the current polling cycle. That null happens from the PollSkipAdvice when its PollSkipStrategy returns false.

    You probably just misinterpret polling cycle and really poll. The first one is indeed the moment when trigger hits the time and its maxMessagesPerPoll. The real poll is indeed about a single pollForMessage call. You might need to revise the logic of your custom strategy to meet the existing requirements for such an advice to apply.