Search code examples
javaspring-bootmavenspring-webfluxspring-reactor

Spring Gateway AsyncPredicate not working with reactor and flux


We have written a custom Predicate factory for the Spring-Gateway to route the requests. We are parsing the body of an XML request and then the route is being derived, based on particular Method existing in the Body. While doing this we have written the following code to create the ServerRquest.

@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            Class<String> inClass = String.class;

            Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

            if (cachedBody != null) {
                try {
                    boolean test = config.pattern.matcher((String) cachedBody).matches();
                    exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                    return Mono.just(test);
                } catch (ClassCastException e) {
                    LOG.error("Predicate test failed because String.class does not match the cached body object", e);
                }
                return Mono.just(false);
            } else {

                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

            }
        };
    }

With an older Version of Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE) working this solution perfectly. But with the latest version of Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6) I am getting the following exception. The gateway application starts normally, without any error.

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
        at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
        ... 84 more

Had anyone else also the same problem and know how to solve this?


Solution

  • The problem was, the greenwich version of those apis was beta. Now the object expected in CACHED_REQUEST_BODY_ATTR is required to be a PooledDataBuffer. So I changed my code accordinly now. Which looks like as following now:

    return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
    
                        DataBufferUtils.retain(dataBuffer);
    
                        Flux<DataBuffer> cachedFlux = Flux
                                .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
    
                        PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
                        
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
    
                            @Override
                            public Flux<DataBuffer> getBody() {
                                return cachedFlux;
                            }
                        };
                        return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                                .bodyToMono(inClass).doOnNext(objectValue -> {
                                    exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                    exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
                                }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                    });
    

    After updating the class, it is working as expected now.