Search code examples
spring-integrationthreadpoolexecutorexecutor

inbound-channel-adapter not scaling based on executor


I have jdbc:inbound-channel-adapter polling 50 records. I am trying to improve the performance by scaling pollerExecutor pool-size to 1-10, so that multiple thread can process 50 records each:

    <int-jdbc:inbound-channel-adapter
    id="initial.ContactType.poller"
    query="${poller.ContactType.get}"
    max-rows="${poller.deliveryContactType.maxRow:50}"
    row-mapper="ContactTypePollerRowMapper"
    data-source="dataSource" channel="ContactTypeChannel">
    <int:poller  fixed-rate="3000" time-unit="MILLISECONDS" task-executor="pollerExecutor">
        <int:advice-chain>
            <ref bean="pollerLoggingAdvice"/>
            <ref bean="txAdvice"  />
        </int:advice-chain>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
<task:executor id="pollerExecutor" pool-size="1-10"
    queue-capacity="0" rejection-policy="CALLER_RUNS" />

I tested the time taken to process 100,000 records is same irrespective of the pool-size.

I did three rounds of tests with pool-size=1, pool-size=1-3 and pool-size=1-10 respectively, in all three tests 100,000 records took 1 hr each time.

I confirmed by checking the logs that pollerExecutor threads are not working parallely. pollerExecutor-1 process all 50 records before pollerExecutor-2 starts processing.

Why container/pollerExecutor is not working parallely?


Solution

  • I think your problem is here:

    /**
     * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
     * Default is {@code Integer.MAX_VALUE}.
     * <p>Any positive value will lead to a LinkedBlockingQueue instance;
     * any other value will lead to a SynchronousQueue instance.
     * @see java.util.concurrent.LinkedBlockingQueue
     * @see java.util.concurrent.SynchronousQueue
     */
    public void setQueueCapacity(int queueCapacity) {
    

    So, if you specify queue-capacity="0", then you end up with a SynchronousQueue, which cannot accept new parallel tasks because there is already one busy for processing those 50 records.

    Try to have some reasonable queue-capacity to observe possible parallelism.