Search code examples
restpaginationapache-camel

Read from Paginated API in Apache camel Route


How to read from a Paginated REST API Endpoint or from a JDBC SQL Query that fetches "K" Items/Records at a time using Apache Camel DSL? Appreciate if there is a clean example for the same.

Thanks in advance.


Solution

  • I have done this using loopDoWhile dsl:

    from("direct:start").loopDoWhile(stopLoopPredicate())
                            .to("bean:restAPIProcessor")
                            .to("bean:dataEnricherBean")
                         .end();
    

    The stopLoopPredicate() is here:

    public Predicate stopLoopPredicate() {
            Predicate stopLoop = new Predicate() {
                @Override
                public boolean matches(Exchange exchange) {
                    return exchange.getIn().getBody() != null && !exchange.getIn().getBody().toString().equalsIgnoreCase("stopLoop");
                }
            };
            return stopLoop;
        }
    

    The restAPIProcessor is an implementation of Processor where the REST API call is made.

    The logic to handle pagination is implemented in restAPIProcessor & the moment the actual REST API returns a null response "stopLoop" is set to the body of the exchange out route. This works pretty well. Here is the code for RestAPIProcessor:

    public class RestAPIProcessor implements Processor {
    
        @Inject
        private RestTemplate restTemplate;
    
        private static final int LIMIT = 100;
    
        private static final String REST_API = "<REST API URL>";
    
        @Override
        public void process(Exchange exchange) throws Exception {
            Integer offset = (Integer) exchange.getIn().getHeader("offset");
            Integer count = (Integer) exchange.getIn().getHeader("count");
    
            if (offset == null) offset = 0;
            if (count == null) count = 0;
    
            String response = "";
            Map<String,Object> body = new LinkedHashMap<>();
            body.put("offset",offset++);
            body.put("limit",LIMIT);
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            HttpEntity<?> entity = new HttpEntity<Object>(body,headers);
            ResponseEntity<String> countResponseEntity = restTemplate.exchange(REST_API, HttpMethod.POST,entity,String.class);
            response = countResponseEntity.getBody();
            count += LIMIT;
            if (response == null || response.isEmpty()) {
                exchange.getIn().setBody("stopLoop");
                exchange.getOut().setHeaders(exchange.getIn().getHeaders());
            } else {
                exchange.getIn().setHeader("count", count);
                exchange.getIn().setHeader("offset", offset);
                exchange.getOut().setHeaders(exchange.getIn().getHeaders());
                exchange.getOut().setBody(response);
            }
        }
    }