Search code examples
springspring-batchspring-integrationspring-jms

Shutting down JMS listener container waiting for shutdown of message listener invokers


Post invoking shutdown on JMS listener container it is waiting for Message listener invokers to be shutdown. We are calling this shut down in Job event listener post completion of the job, and there are no messages to be consumed from request queue, so not sure why these message listeners are not shutting down. below are logs from Server post shutdown call.

INFO  [org.springframework.batch.core.launch.support.SimpleJobLauncher] (aspAsyncExecutor-1) Job: [FlowJob: [name=SENEXTRACT]] completed with the following parameters: [{​​​​​​​--listId=15195, --letterId=BF2025, --randomId=99a3d764-8cbf-4dbd-81c8-a5442e6e67e5}​​​​​​​] and the following status: [COMPLETED] in 4m20s353ms
DEBUG [org.springframework.integration.channel.DirectChannel] (aspAsyncExecutor-1) preSend on channel 'bean 'controlChannel'', message: GenericMessage [payload=@senExtractInGateway.stop(), headers={​​​​​​​id=6d7cdaaa-21e5-9a2f-e2ab-d83523747837, timestamp=1620803972934}​​​​​​​]
DEBUG [org.springframework.integration.handler.ServiceActivatingHandler] (aspAsyncExecutor-1) ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@2decec67] (org.springframework.integration.config.ExpressionControlBusFactoryBean#0) received message: GenericMessage [payload=@senExtractInGateway.stop(), headers={​​​​​​​id=6d7cdaaa-21e5-9a2f-e2ab-d83523747837, timestamp=1620803972934}​​​​​​​]
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] (aspAsyncExecutor-1) Shutting down JMS listener container
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] (aspAsyncExecutor-1) Waiting for shutdown of message listener invokers
DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] (aspAsyncExecutor-1) Still waiting for shutdown of 30 message listener invokers (iteration 0)

Configuration is as follows.

<int-jms:inbound-gateway
        id="senExtractInGateway" connection-factory="connectionFactory"
        correlation-key="JMSCorrelationID"
        request-channel="senExtractProcessingRequestChannel"
        request-destination-name="senExtractRequestQueue"
        reply-channel="senExtractProcessingReplyChannel"
        default-reply-queue-name="senExtractReplyQueue"
        concurrent-consumers="1" max-concurrent-consumers="30"
        max-messages-per-task="1" reply-timeout="1800000"
        receive-timeout="1800000"  auto-startup="false"/>       
   <integration:channel id="controlChannel" />

    <integration:control-bus input-channel="controlChannel" />

Code Snippet:

 MessageChannel controlChannel = appContext.getBean("controlChannel", MessageChannel.class); 
  controlChannel.send(new GenericMessage<String>("@senExtractInGateway.start()"));
  logger.info("Received before adapter started: ");
  //controlChannel.send(new GenericMessage<String>("@senExtractSrvActivator.start()"));
  JobExecution execution = jobLauncher.run(job, jobParameters);

   controlChannel.send(new GenericMessage<String>("@senExtractInGateway.stop()"));

Server Threads


Solution

  • You have this config receive-timeout="1800000". So, it is not a surprise that your consumer is blocked for those 30 mins. See its JavaDocs:

    /**
     * Actually receive a message from the given consumer.
     * @param consumer the JMS MessageConsumer to receive with
     * @param timeout the receive timeout (a negative value indicates
     * a no-wait receive; 0 indicates an indefinite wait attempt)
     * @return the JMS Message received, or {@code null} if none
     * @throws JMSException if thrown by JMS API methods
     * @since 4.3
     * @see #RECEIVE_TIMEOUT_NO_WAIT
     * @see #RECEIVE_TIMEOUT_INDEFINITE_WAIT
     */
    @Nullable
    protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException {
    

    As long as consumers are blocked waiting for a message in the destination, the container cannot claim its state as stopped, therefor it waits for those consumers to become free and release resources.