Search code examples
javaspringspring-integrationspring-integration-http

Spring Integration fetch paginated results from a REST service


I'm working on an integration with a REST service, the idea is that it's polled by an outbound gateway marketingCategoryOutboundGateway implemented by HttpRequestExecutingMessageHandler. The gateway makes a request to the REST service and pushes its response to the marketingCategory channel. The gateway itself is triggered by a message created by marketingCategoryPollerMessageSource using the makeTriggeringMessage factory method.

The problem is that the service returns paginated results. I something which would listen on the marketingCategory channel, apart from the service activator I already have, check if the response and push a new message with an incremented page number created by makeTriggeringMessage to the marketingCategoryPoller channel, so that the code would spin in a loop until it fetches all the pages from the REST service.

Does Spring Integration allow to make such filters which receive one message on the input channel, test it against a condition and push a new message to the output channel if the condition is true?

The code:

//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}

//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}

//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}

//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
    //make a message for triggering marketingCategoryOutboundGateway
    return MessageBuilder.withPayload("")
            .setHeader("Host", "eclinic")
            .setHeader("page", page)
            .build();
}

//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    //make a request to the REST service and push the response to the marketingCategory channel
}

//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
    return (msg) -> {
        //process the categories returned by marketingCategoryOutboundGateway
    };
}

Solution

  • I've found a solution based on this posting Read and download from a paginated REST-Services with spring integration:

    1. Trigger the outbound gateway which talks to the REST service and pushes the response to a channel using an inbound channel adapter with a poller. The inbound channel adapter is a message source which originally generates a message with a header indicating the page number to fetch from the REST API. The page message header is used by the outbound gateway to generate a url specifying the desired page

    2. The channel to which the outbound gateway pushes REST service responses has 2 subscribers:

      2.1. a service activator which does something with the fetched data

      2.2. a filter, which checks if this is the last page and if not, it sends the message further to another channel used by a header enricher

    3. Having received a message, the header enricher increments its page header and pushes the message further to the channel which triggers the outbound gateway, the gateway read the incremented page header and fetches the next page from the REST service

    4. The loop keeps spinning until the REST service returns the last page. The filter doesn't let this message to pass through to the header enricher breaking the loop.

    Full code:

    @Configuration
    public class IntegrationConfiguration {
    
        private final ApiGateConfig apiGateConfig;
    
        IntegrationConfiguration(ApiGateConfig apiGateConfig) {
            this.apiGateConfig = apiGateConfig;
        }
    
        @Bean("marketingCategory")
        MessageChannel marketingCategory() {
            return new PublishSubscribeChannel();
        }
    
        @Bean
        MessageChannel marketingCategoryPoller() {
            return new DirectChannel();
        }
    
        @Bean
        MessageChannel marketingCategoryPollerNextPage() {
            return new DirectChannel();
        }
    
        @Bean
        @InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
        public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
            return () -> makeTriggeringMessage(0);
        }
    
        /**
         * Build a gateway triggering message
         */
        private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
            return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
                    .setHeader("Host", "eclinic")
                    .setHeader("page", page)
                    .build();
        }
    
        @Bean
        @ServiceActivator(inputChannel = "marketingCategoryPoller")
        public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    
            String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";
    
            //the type of the payload
            ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
            };
    
            //page number comes from the message
            SpelExpressionParser expressionParser = new SpelExpressionParser();
            var uriVariables = new HashMap<String, Expression>();
            uriVariables.put("page", expressionParser.parseExpression("headers.page"));
    
            HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
            handler.setHttpMethod(HttpMethod.GET);
            handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
            handler.setOutputChannel(channel);
            handler.setUriVariableExpressions(uriVariables);
    
            return handler;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "marketingCategory")
        public MessageHandler marketingCategoryHandler() {
            return (msg) -> {
                var page = (RestPageImpl<MarketingCategory>) msg.getPayload();
    
                System.out.println("Page #" + page.getNumber());
    
                page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));
    
            };
        }
    
        @Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
        public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
            return !page.isLast();
        }
    
        @Bean
        @Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
        HeaderEnricher incrementPage() {
            Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
            Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");
    
            var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
            valueProcessor.setOverwrite(true);
    
            headersToAdd.put("page", valueProcessor);
            return new HeaderEnricher(headersToAdd);
        }
    }