Search code examples
javaspringmultithreadingasynchronousspring-integration

Spring Integration - get warning logs for inbound-gateway response then continue with asynchronous processing in background


Description:

We have a Spring Integration flow where we want to have an endpoint configured, split the payload, perform some transformation in parallel and then after some validation one part return to the consumer i.e. with a http code 202 and the other items must continue to be processed asynchronous.

Explanation of the configuration:

We have a <int-http:inbound-gateway>. We split <int:splitter> the incoming payload to perform some transformation in parallel <int:dispatcher task-executor="taskExecutor"/> and then aggregate <int:aggregator> it to analyze the state.

If error we return error state if not we split again.

In the <int:header-value-router with input channel: mainItemRouter-channel The MAIN message will answer to the receiver thread so that the consumer of the API get a 202. The ITEM messages needs to be processed asynchronously even the consumer already received a 202.

As far as we can tell the configuration works, but we have for every asynchronous message we process, a warning log: "level":"WARN","loggerName":"org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel","message":"Reply message received but the receiving thread has already received a reply: ... ":

Why do we have this warning?

Is the configuration wrong?

Did we miss something on the configuration?

This is the configuration:

<beans>
  
    <int:channel id="postInboundReply-channel"/>
    <int:channel id="postAggregate-channel"/>
    
    <int:channel id="postFinish-channel">
    <int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
        <int:header name="myHeader" overwrite="true"
                    expression="setMyHeader"/>
    </int:header-enricher>


    <!-- Endpoint -->
    <int-http:inbound-gateway id="postBulkInbound-gateway"
                              request-channel="postInboundRequest-channel"
                              reply-channel="postInboundReply-channel"
                              supported-methods="POST"
                              path="/api/tests"
                              request-payload-type="java.lang.String"
                              mapped-request-headers="content-type,Authorization"
                              mapped-response-headers="Content-Type,Location,myHeader"
                              reply-timeout="100000">
        <int-http:request-mapping consumes="application/json" produces="application/json"/>
    </int-http:inbound-gateway>

    <!-- split payload -->
    <int:splitter id='splitter' ref='splitterObject' method='split'
                  input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>

    <!-- MAIN goes directly to aggregator, ERROR goes to the end -->
    <int:header-value-router input-channel="mainRouter-channel"  default-output-channel="executor-channel" header-name="myType">
        <int:mapping value="MAIN" channel="postAggregate-channel"/>
        <int:mapping value="ITEM" channel="executor-channel"/>
        <int:mapping value="ERROR" channel="postFinish-channel"/>
    </int:header-value-router>

    <!-- execute items in parallel -->
    <task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
    <int:channel id="executor-channel">
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

    <int:transformer input-channel="executor-channel"
                     output-channel="postAggregate-channel"
                     ref="aTransformationBean"/>

    <!-- Checkpoint: items could be transformed -->
    <int:aggregator
      input-channel="postAggregate-channel"
      ref="transformationAggregator"
      method="aggregate"
      output-channel="afterAggregation-channel"
      release-lock-before-send="true"
    />

    <!-- before we need to check if one has transform error if yes return 400 else continue -->
    <int:header-value-router resolution-required="false" input-channel="afterAggregation-channel" default-output-channel="afterAggregationNonError-channel" header-name="myType">
        <int:mapping value="ERROR" channel="afterAggregationWithError-channel"/>
    </int:header-value-router>

    <int:transformer input-channel="afterAggregationWithError-channel"
                output-channel="postFinish-channel"
                ref="errorTransformer"/>

    <!-- split for further processing -->
    <int:splitter input-channel='afterAggregationNonError-channel' ref='processingSplitter' output-channel='mainItemRouter-channel'/>

    <!-- MAIN goes back to the inbound caller thread, ITEM processing asynchronouns -->
    <int:header-value-router input-channel="mainItemRouter-channel"  header-name="myType">
        <int:mapping value="MAIN" channel="mainReply-channel"/>
        <int:mapping value="ITEM" channel="asyncProcessingGatewayChain-channel"/>
    </int:header-value-router>

    <int:transformer input-channel="mainReply-channel"
                     output-channel="postFinish-channel"
                     ref="responderBean"/>

    <!-- execute parallel -->
    <task:executor id="processingExecutor" pool-size="10" rejection-policy="DISCARD" />
    <int:channel id="asyncProcessingGatewayChain-channel">
        <int:dispatcher task-executor="processingExecutor"/>
    </int:channel>

    <!-- wrap the whole async  process in a chain in order to catch exceptions in the error-channel -->
    <int:chain input-channel="asyncProcessingGatewayChain-channel">
      <int:gateway id="asyncProcessingGateway" request-channel="asyncProcessing-channel" error-channel="asyncProcessingError-channel"/>
    </int:chain>

    <int:chain input-channel="asyncProcessing-channel">
        <int:transformer ref="asyncTransformerBean"/>
        <int:transformer ref="asyncSendJmsMessageBean"/>
        <int:transformer ref="asyncStoreStateBean"/>
    </int:chain>
    
    <int:transformer input-channel="asyncProcessingError-channel"
                     ref="asyncProcessingExceptionHandler"/>
</beans>

Solution

  • A gateway can only receive one reply per request; hence the warning log; your async flow is also returning a reply.

    Send the output-channel of the async flow (chain) to nullChannel.

    Or, make the final component (asyncStoreStateBean) a service-activator that has a void return - that will also prevent a reply going back to the gateway. A transformer must return something.