Search code examples
springspring-integrationspring-integration-dsl

Timeout on replyChannel when wireTap is used


We are using wireTap to take timestamps at different parts of the flow. When introduced to the newest flow, it started causing a timeout in the replyChannel. From what I understand from the documentation, wireTap does intercept the message and sends it to secondary channel, while not affecting the main flow - so it looks like the perfect thing to use to take snapshots of said timestamps. Are we using wrong component for the job, or is there something wrong with the configuration? And if so, how would you recommend to register such information?

The exception:

o.s.integration.core.MessagingTemplate   : Failed to receive message from channel 'org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@21845b0d' within timeout: 1000

The code:

@Bean
public MarshallingWebServiceInboundGateway inboundGateway(Jaxb2Marshaller jaxb2Marshaller,
    DefaultSoapHeaderMapper defaultSoapHeaderMapper) {

    final MarshallingWebServiceInboundGateway inboundGateway =
        new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
    inboundGateway.setRequestChannelName(INPUT_CHANNEL_NAME);
    inboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
    return inboundGateway;
}

@Bean
public IntegrationFlow querySynchronous() {
    return IntegrationFlows.from(INPUT_CHANNEL_NAME)
        .enrichHeaders(...) 
        .wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
        .handle(outboundGateway)
        .wireTap(performanceTimestampRegistrator.registerTimestampFlow(SYNC_RESPONSE_RECEIVED_TIMESTAMP_NAME))
        //.transform( m -> m) // for tests - REMOVE
        .get();
}

And the timestamp flow:

public IntegrationFlow registerTimestampFlow(String asyncRequestReceivedTimestampName) {
    return channel -> channel.handle(
        m -> MetadataStoreConfig.registerFlowTimestamp(m, metadataStore, asyncRequestReceivedTimestampName));
}

The notable thing here is that if I uncomment the no-operation transformer, everything suddenly works fine, but it doesn't sound right and I would like to avoid such workarounds.

Another thing is that the other, very similar flow works correctly, without any workarounds. Notable difference being it puts message in kafka using kafka adapter, instead of calling some web service with outbound gateway. It still generates response to handle (with generateResponseFlow()), so it should behave the same way. Here is the flow, which works fine:

@Bean
public MarshallingWebServiceInboundGateway workingInboundGateway(Jaxb2Marshaller jaxb2Marshaller,
    DefaultSoapHeaderMapper defaultSoapHeaderMapper, @Qualifier("errorChannel") MessageChannel errorChannel) {

    MarshallingWebServiceInboundGateway aeoNotificationInboundGateway =
        new MarshallingWebServiceInboundGateway(jaxb2Marshaller);
    aeoNotificationInboundGateway.setRequestChannelName(WORKING_INPUT_CHANNEL_NAME);
    aeoNotificationInboundGateway.setHeaderMapper(defaultSoapHeaderMapper);
    aeoNotificationInboundGateway.setErrorChannel(errorChannel);
    return aeoNotificationInboundGateway;
}

@Bean
public IntegrationFlow workingEnqueue() {
    return IntegrationFlows.from(WORKING_INPUT_CHANNEL_NAME)
        .enrichHeaders(...)
        .wireTap(performanceTimestampRegistrator
            .registerTimestampFlow(ASYNC_REQUEST_RECEIVED_TIMESTAMP_NAME))
        .filter(...)
        .filter(...)
        .publishSubscribeChannel(channel -> channel
            .subscribe(sendToKafkaFlow())
            .subscribe(generateResponseFlow()))
        .wireTap(performanceTimestampRegistrator
            .registerTimestampFlow(ASYNC_REQUEST_ENQUEUED_TIMESTAMP_NAME))
        .get();
}

Then, there is no problem with wireTap being the last component and response is correctly received on replyChannel in time, without any workarounds.


Solution

  • The behavior is expected. When the wireTap() (or log()) is used in the end of flow, there is no reply by default. Since we can't assume what logic you try to include into the flow definition, therefore we do our best with the default behavior - the flow becomes a one-way, send-and-forget one: some people really asked to make it non replyable after log() ...

    To make it still reply to the caller you need to add a bridge() in the end of flow. See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log

    It works with your much complex scenario because one of the subscriber for your publishSubscribeChannel is that generateResponseFlow() with the reply. Honestly you need to be careful with request-reply behavior and such a publishSubscribeChannel configuration. The replyChannel can accept only one reply and if you would expect a reply from several subscribers, you would be surprised how the behavior is strange.

    The wireTap in this your configuration is not a subscriber, it is an interceptor injected into that publishSubscribeChannel. So, your assumption about similarity is misleading. There is the end of the flow after that wiretap, but since one of the subscribers is replying, you get an expected behavior. Let's take a look into the publishSubscribeChannel as a parallel electrical circuit where all the connections get an electricity independently of others. And they perform they job not affecting all others. Anyway this is different story.

    To conclude: to reply from the flow after wireTap(), you need to specify a bridge() and reply message will be routed properly into the replyChannel from the caller.