Search code examples
spring-batchspring-integrationspring-messaging

Spring Integration - Producer Queue capacity limitations


Spring Integration - Producer Queue capacity limitations

We are using Remote partitioning with MessageChannelPartitionHandler to send partition messages to Queue(ActiveMQ) for workers to be pick and process. The job has huge data to process , many partition messages are publishing to queue and the aggregator of response from replyChannnel is failing with timeout of messages as all messages cant be processed in a given time. We also tried to limit messages published to queue by using queue capacity <integration:queue capacity="200"/> which resulted into server crash with heap dump generated due to memory issues of holding all these partition messages in internal memory.

We wanted to control the creation of StepExecution split itself , so that memory issue doesn’t occur. Example case is around 4k partition messages are being published to queue and whole job takes around 3hrs.

Can we control the publishing of messages to QueueChannel?

<bean id="senExtractMemberMasterPartitionHandler"   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
        <property name="messagingOperations" ref="senExtractMemberMasterPartitionMsgTemplate" />
        <property name="replyChannel" ref="senExtractProcessingMasterAggregatedChannel" />
        <property name="stepName" value="senExtractGeneratePrintRequestWorkerStep" />
        <property name="gridSize" value="500" />
</bean>
<bean id="senExtractMemberMasterPartitionMsgTemplate" class="org.springframework.integration.core.MessagingTemplate">
    <property name="defaultChannel" ref="senExtractProcessingMasterRequestChannel" />
    <property name="receiveTimeout" value="18000000" />
</bean>

<integration:channel id="senExtractProcessingMasterAggregatedChannel" >
    <integration:queue />
    <integration:interceptors>
        <integration:wire-tap channel="masterLoggingChannel" />
    </integration:interceptors>
</integration:channel>


<int-jms:outbound-gateway
    id="senExtractMasterOutGateway" 
    connection-factory="masterJMSConnectionFactory"
    correlation-key="JMSCorrelationID"
    request-channel="senExtractProcessingMasterRequestChannel"
    request-destination-name="senExtractRequestQueue" 
    reply-channel="senExtractProcessingMasterReplyChannel"
    reply-destination-name="senExtractReplyQueue" 
    async="true"
    auto-startup="true"
    reply-timeout="18000000" 
    receive-timeout="6000">
    <integration:poller ref="masterPoller"/>
    <int-jms:reply-listener />  
</int-jms:outbound-gateway>

Solution

  • The job has huge data to process , many partition messages are publishing to queue and the aggregator of response from replyChannnel is failing with timeout of messages as all messages cant be processed in a given time.

    You need to increase your timeout or add more workers. The Javadoc of MessageChannelPartitionHandler is clear about that:

    The receive timeout needs to be set realistically in the MessagingTemplate
    and the aggregator, so that there is a good chance of all work being done.
    

    We wanted to control the creation of StepExecution split itself

    Spring Batch provides the StepExecutionSplitter interface for that. If the default one (SimpleStepExecutionSplitter) does not suit your needs, you can provide a custom implementation to your partitioned step.