Search code examples
springspring-integrationspring-webflux

Spring Integration webflux fails for every other calls


I have created the following pipeline which is exposing and endpoint to accept POST request and from that it is calling another rest service GET method and respond back. But for some reason which I cannot understand I get bad request for every second calls.

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.webflux.dsl.WebFlux;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.util.UriComponentsBuilder;

@Configuration
@EnableIntegration
@Slf4j
public class SpringIntegrationFLowConfiguration {

    @Bean
    public IntegrationFlow inboundPost(
            @Qualifier("inputChannel") MessageChannel inputChannel) {

        return IntegrationFlow
                .from(WebFlux.inboundGateway("/reactivePost")
                        .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                        .requestPayloadType(String.class)
                        .statusCodeFunction(m -> HttpStatus.OK))
                .channel(inputChannel)
                .get();
    }

    @Bean("inputChannel")
    public PublishSubscribeChannel inputChannel() {
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        channel.setDatatypes(String.class);
        return channel;
    }

    @Bean("outputGet")
    public IntegrationFlow outboundChannelFlow(
            @Qualifier("inputChannel") MessageChannel inputChannel) {

        return IntegrationFlow
                .from(inputChannel)
                .handle(WebFlux.<String>outboundGateway(message ->
                                UriComponentsBuilder.fromUriString("http://localhost:8080/")
                                        .pathSegment(message.getPayload())
                                        .build()
                                        .toUri())
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(String.class))
                .get();
    }
}

I also created a very simple spring application and a rest controller in it to reply to the GET request from this pipeline to test it:

@RestController
public class HelloController {

    @GetMapping("/{name}")
    public Mono<String> hello(@PathVariable(name = "name") String name) {
        return Mono.just("Hello " + name + "!");
    }
}

I send request to this pipeline with text body. For the first call I get the response back correctly but for the second time I get the following error as 400 Bad Request in the pipeline and the following error in HelloController application:

r.n.http.server.HttpServerOperations : [33dbeb61, L:/127.0.0.1:8080 - R:/127.0.0.1:51916] Decoding failed: FULL_REQUEST(decodeResult: failure(java.lang.IllegalArgumentException: text is empty (possibly HTTP/0.9)), version: HTTP/1.0, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0)) GET /bad-request HTTP/1.0


Solution

  • Looks like your loggerFlow is one-way - send-and-forget. See if nullChannel() in the end of that flow helps. More info in docs: https://docs.spring.io/spring-integration/reference/dsl/java-log.html#page-title

    So, if a reply is not expected to be produced in the end of the flow, the nullChannel() is recommended to be used after the last log().

    UPDATE

    Found the problem.

    The error from Netty is a bit cryptic:

    java.lang.IllegalArgumentException: text is empty (possibly HTTP/0.9)), version: HTTP/1.0
    

    Turns out when we relay POST to GET we still have a Content-Length: 5 header mapped from the original request to the message headers. And that WebFlux.outboundGateway() maps it by default. Apparently this is some trigger for Netty on the other side to try to decode the body of the request which is definitely empty because we perform GET on the client.

    So, the solution for you is like this:

                .from(inputChannel)
                .headerFilter("Content-Length")
                .handle(WebFlux.<String>outboundGateway(message ->
    

    remove that Content-Length header which has nothing to do with the GET request we are performing.

    See more in: Netty Decoding failed DefaultFullHttpRequest

    I think I'll fix Spring Integration to remove that possible header in this case:

        HttpHeaders httpHeaders = mapHeaders(message);
        if (!shouldIncludeRequestBody(httpMethod)) {
            return new HttpEntity<>(httpHeaders);
        }
    

    where:

    private static final List<HttpMethod> NO_BODY_HTTP_METHODS =
            Arrays.asList(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.TRACE);
    

    So, as long as "no body request", no Content-Length are supposed to be mapped to the HTTP request. Apparently regular HTTP client just ignores that one somehow.

    Not sure, though, why that works for the first request...