How to read from a Paginated REST API Endpoint or from a JDBC SQL Query that fetches "K" Items/Records at a time using Apache Camel DSL? Appreciate if there is a clean example for the same.
Thanks in advance.
I have done this using loopDoWhile dsl:
from("direct:start").loopDoWhile(stopLoopPredicate())
.to("bean:restAPIProcessor")
.to("bean:dataEnricherBean")
.end();
The stopLoopPredicate() is here:
public Predicate stopLoopPredicate() {
Predicate stopLoop = new Predicate() {
@Override
public boolean matches(Exchange exchange) {
return exchange.getIn().getBody() != null && !exchange.getIn().getBody().toString().equalsIgnoreCase("stopLoop");
}
};
return stopLoop;
}
The restAPIProcessor is an implementation of Processor where the REST API call is made.
The logic to handle pagination is implemented in restAPIProcessor & the moment the actual REST API returns a null response "stopLoop" is set to the body of the exchange out route. This works pretty well. Here is the code for RestAPIProcessor:
public class RestAPIProcessor implements Processor {
@Inject
private RestTemplate restTemplate;
private static final int LIMIT = 100;
private static final String REST_API = "<REST API URL>";
@Override
public void process(Exchange exchange) throws Exception {
Integer offset = (Integer) exchange.getIn().getHeader("offset");
Integer count = (Integer) exchange.getIn().getHeader("count");
if (offset == null) offset = 0;
if (count == null) count = 0;
String response = "";
Map<String,Object> body = new LinkedHashMap<>();
body.put("offset",offset++);
body.put("limit",LIMIT);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<Object>(body,headers);
ResponseEntity<String> countResponseEntity = restTemplate.exchange(REST_API, HttpMethod.POST,entity,String.class);
response = countResponseEntity.getBody();
count += LIMIT;
if (response == null || response.isEmpty()) {
exchange.getIn().setBody("stopLoop");
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
} else {
exchange.getIn().setHeader("count", count);
exchange.getIn().setHeader("offset", offset);
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
exchange.getOut().setBody(response);
}
}
}