Search code examples
javaspring-integrationspring-integration-dslspring-integration-http

Why client gets 500 response error although server doesn't experience any errors in spring-integration-http?


I have following server configuration:

@Configuration
@EnableIntegration
public class Config {

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class))
                .enrich(enricherSpec -> {
                    enricherSpec.header("correlationId", 1); //ackCorrelationId
                })
                .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
                .log()
                //.barrier(1000L)
                .log()
                .handle(Amqp.outboundAdapter(amqpTemplate())
                        .exchangeName("barrierExchange")
                        .routingKey("barrierKey")
                        .confirmAckChannel(confirmAckChannel())
                        .confirmCorrelationExpression("payload")
                )
                .get();
    }


    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost("localhost");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setPublisherConfirms(true);
        cachingConnectionFactory.setPublisherReturns(true);
        return cachingConnectionFactory;
    }

    @Bean
    public AmqpTemplate amqpTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public DirectChannel confirmAckChannel() {
        return new DirectChannel();
    }

    @Bean

    public IntegrationFlow ackChannelListener() {
        return IntegrationFlows.from(confirmAckChannel())
                .handle(m -> {
                    System.out.println("ACK:" + m);
                })
                .get();
    }

}

and following client configuration:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(consoleSource(), consoleConsumer())
                .handle(httpOutboundGateway())
                .log()
                .channel("httpRequestChannel")
                .handle(s -> {
                    System.out.println("We got response: " + s);
                })
                .get();
    }

    private HttpMessageHandlerSpec httpOutboundGateway() {
        return Http.outboundGateway("http://localhost:8080/spring_integration_post") //http://localhost:8080/my_post
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class);
    }

    private Consumer<SourcePollingChannelAdapterSpec> consoleConsumer() {
        return c -> c.poller(Pollers.fixedRate(1000)
                .maxMessagesPerPoll(1));
    }

    public MessageSource<String> consoleSource() {
        return CharacterStreamReadingMessageSource.stdin();
    }
}

From client I send a,v,b

I see that server accepts that message, sends 3 messages into rabbitMq(via rabbit admin I see that messages are really accepted by rabbit) and gets 3 acknowledges:

ACK:GenericMessage [payload=a, headers={amqp_publishConfirm=true, id=eb8fd94b-5721-8b3b-5219-b7a4e0d950a8, timestamp=1567062603387}]
ACK:GenericMessage [payload=v, headers={amqp_publishConfirm=true, id=724b7def-1ab2-79b9-c788-27f6cbe24a33, timestamp=1567062603388}]
ACK:GenericMessage [payload=b, headers={amqp_publishConfirm=true, id=12a10799-d664-64dc-dd9e-1601fda61977, timestamp=1567062603389}]

But client prints following trace:

2019-08-29 10:10:04.392 ERROR 18404 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=a,v,b, headers={id=0b902e6f-d4de-329f-9916-19f94cdedc4e, timestamp=1567062603152}]
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
    at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
    at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
    at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
    ... 30 more

Why it happens?

How can I fix it ?

P.S.

As @Artem Bilan said my problem that I don't respond anything to client so client experienced timeout error. I would like to say that error is really confusing. I would expect 504 error fir timeout.

I had an attempt to return something to the client after all messages was sent to rabbit so I wrote following config:

@Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class))
            .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
            .log()
            .handle(Amqp.outboundAdapter(amqpTemplate())
                    .exchangeName("barrierExchange")
                    .routingKey("barrierKey")
                    .confirmAckChannel(confirmAckChannel())
                    .confirmCorrelationFunction(Message::getPayload)
            ).handle((payload, headers) -> {
                System.out.println("Before aggregation");
                return true;
            })
            .aggregate()
            .handle((payload, headers) -> {
                System.out.println("After aggregation");
                return true;
            }).get();

But from client side I see the same stack trace. Also From server side I see that trace:

2019-08-30 12:19:10.166  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.167  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.172  INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:5672
2019-08-30 12:19:10.198  INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#123d7057:0/SimpleConnection@2ae574e5 [delegate=amqp://[email protected]:5672/, localPort= 63045]
2019-08-30 12:19:10.224  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
2019-08-30 12:19:10.225  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
ACK:GenericMessage [payload=A, headers={amqp_publishConfirm=true, id=5d85f459-661d-69c4-4d36-b5843289b41b, timestamp=1567156750227}]
2019-08-30 12:19:10.227  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
2019-08-30 12:19:10.228  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
ACK:GenericMessage [payload=B, headers={amqp_publishConfirm=true, id=1136d70b-8dc5-2397-f936-0246c0ad8073, timestamp=1567156750231}]
ACK:GenericMessage [payload=C, headers={amqp_publishConfirm=true, id=1b74d944-ec4c-75c2-83f5-3b36a8e89a39, timestamp=1567156750232}]

It means that 2 my handlers were not invoked at all. Why?

Update

I also was able to apply advice related with publish subscribe:

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class))
//                .enrich(enricherSpec -> {
//                    enricherSpec.header("correlationId", 1); //ackCorrelationId
//                })
                .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
                .log()
                //.barrier(1000L)
                .log()
                .publishSubscribeChannel(publishSubscribeSpec -> {
                            publishSubscribeSpec.applySequence(true);
                            publishSubscribeSpec.subscribe(f -> Amqp.outboundAdapter(amqpTemplate())
                                    .exchangeName("barrierExchange")
                                    .routingKey("barrierKey")
                                    .confirmAckChannel(confirmAckChannel())
                                    .confirmCorrelationFunction(Message::getPayload));
                            publishSubscribeSpec.subscribe(flow -> {
                                flow.handle((p, h) -> "from server: " + p);
                            });
                        }
                ) .get();

The error is disappear but clients gets only last splitted part:

1,2,3,4,5
2019-08-30 12:48:24.570  INFO 19476 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]
We got response: GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]

As far I understand at this case 2 subflows are independable and for some reason only last part is sent to the client....


Solution

  • Your problem is here:

    .handle(Amqp.outboundAdapter(amqpTemplate())
                        .exchangeName("barrierExchange")
                        .routingKey("barrierKey")
                        .confirmAckChannel(confirmAckChannel())
                        .confirmCorrelationExpression("payload")
                )
                .get();
    

    You just stop your flow meanwhile it starts from the Http.inboundGateway() which excepts some reply to sent back. With the Amqp.outboundAdapter() you make your flow as one-way, so nothing is going to send reply back to that waiting inbound gateway.

    Since there is no reply sent back to the client it ends up with the request timeout error indicating that server is guilty to not send a reply in time.

    This is an answer to your situation. How to fix and what is out of scope of this question. I see that you try to reproduce a barrier sample from XML to Java DSL, but you are missing a lot of pieces in your configuration. And you also brake the logic in several places:

    enricherSpec.header("correlationId", 1)
    

    You use the same static 1 for every single request. So, when time comes to correlation logic you are going to have problems with collision between different requests.

     .split(s -> s.applySequence(false)
    

    If you are going to implement barrier, you indeed will have an aggregator, when the logic is really based on the correlation. So, the applySequence must be true for proper aggregation afterwards. And from here that header("correlationId" is wrong again. We need one correlation key for barrier (essentially between request and delayed reply). That's why it is like ackCorrelationId in the sample. And another built-in correlationId (alongside with other correlation details) which is populated by the spitter and used by the aggregator.