Search code examples
javaconcurrencyspring-integrationspring-jmsspring-integration-dsl

How to handle closing ExecutorService when used with Spring Integration Flows?


I am using an executor service to ensure that incoming jms messages are acknowledged after I write them to my database - (Using XA datasources and distributed transaction is not an option we have right now). To achieve this, my flow writes to the database when a message is received, and then uses an executorservice to start a new thread.

@Bean
  public Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec() {
    final ExecutorService executorService = Executors.newCachedThreadPool();
    return channels -> channels.executor(executorService);
  }
@Bean
  public Consumer<HeaderEnricherSpec> errorChannelSpec(MessageChannel genericExceptionChannel) {
    return h -> h.header(MessageHeaders.ERROR_CHANNEL, genericExceptionChannel);
  }
@Bean
  public IntegrationFlow jmsMessageFlow(
      @Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory,
      Function<Channels, MessageChannelSpec<?, ?>> jmsTxCommitingChannelSpec)
{
  return IntegrationFlow.from(
            Jms.messageDrivenChannelAdapter(connectionFactory)
                .destination("INCOMING_QUEUE")
                .configureListenerContainer(
                    jmsListenerContainerSpec.andThen(spec -> spec.id("ListenerContainer")))
                .errorChannel(genericExceptionChannel)
                .outputChannel("messageHandlingChannel"))
       // save message in db
       .handle(
            (payload, headers) -> databaseService.save(payload),
            spec -> spec.advice(messageRetryAdvice).id("persistClientMessage"))
  // new thread so that the jms message is acknowledged
        .channel(jmsTxCommitingChannelSpec)
        .enrichHeaders(errorChannelSpec)
        .handle(
            (payload, headers) -> messageParser.extractMessageMetadata(payload),
            spec -> spec.id("extractMessageMetadata"))
        .route(incomingMessageRouter)
        .get();
}

I am not sure on how the close the executor service in this case? Or if it should even be closed?


Solution

  • Consider to use a ThreadPoolTaskExecutor instead of that Executors.newCachedThreadPool() and as a bean. It has a proper lifecycle management when Spring application is shouted down.

    See more info in docs: https://docs.spring.io/spring-framework/reference/integration/scheduling.html#scheduling-task-executor