Using below mentioned NiFi Rest API endpoint and code snippet,
I am fetching a list of Remote Process Groups (RPG), iterating and fetching each RPG details. The problem is, I am getting inaccurate RPG data. If I hit this Endpoint (https://nifihost:8080/nifi-api/remote-process-groups/{id}
), I am receiving accurate details. Please clarify,
https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups
Vs.
https://nifihost:8080/nifi-api/remote-process-groups/{id}
)Endpoint:
https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups
Source Code
ArrayList<NifiRemoteProcessGroup> remoteProcessGroupArrayList = new ArrayList<>();
String returnedJSON = "";
String remoteProcessGroupURL = getNifiURL() + "/nifi-api/process-groups/" + processGroup + "/remote-process-groups";
HttpEntity httpEntity = RestCall.oAuthHeaders(token);
RestTemplate restTemplate = new RestTemplate();
try{
ResponseEntity<String> response = restTemplate.exchange(remoteProcessGroupURL,HttpMethod.GET,httpEntity,String.class);
returnedJSON = response.getBody();
}
catch(Exception e){
logger.error("There was an error retrieving the remote-process-groups : " + e.getMessage());
}
try{
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(returnedJSON);
JsonNode processorNode = rootNode.path("remoteProcessGroups");
Iterator<JsonNode> elements = processorNode.elements();
while(elements.hasNext()){
JsonNode remoteProcessGroup = elements.next();
JsonNode statusElement = remoteProcessGroup.path("status");
JsonNode bulletinElement = remoteProcessGroup.path("bulletins");
JsonNode componentElement = remoteProcessGroup.path("component");
JsonNode aggregateSnapshot = statusElement.path("aggregateSnapshot");
NifiRemoteProcessGroup remoteProcessGroupInstance = new NifiRemoteProcessGroup();
remoteProcessGroupInstance.setRemoteProcessGroupId(checkExists(statusElement,"id"));
remoteProcessGroupInstance.setRemoteProcessGroupName(checkExists(componentElement,"name"));
remoteProcessGroupInstance.setRemoteProcessGroupGroupId(checkExists(statusElement,"groupId"));
remoteProcessGroupInstance.setRemoteProcessGroupTargetURL(checkExists(componentElement,"targetUri"));
remoteProcessGroupInstance.setRemoteProcessGroupBulletins(bulletinElement.asText());
remoteProcessGroupInstance.setRemoteProcessGroupTransmitting(Boolean.valueOf(checkExists(componentElement,"transmitting")));
remoteProcessGroupInstance.setRemoteProcessGroupTransmissionStatus(checkExists(statusElement,"transmissionStatus"));
remoteProcessGroupInstance.setRemoteProcessGroupActiveThreadCount(Double.valueOf(checkExists(aggregateSnapshot,"activeThreadCount")));
remoteProcessGroupInstance.setRemoteProcessGroupFlowFilesReceived(Double.valueOf(checkExists(aggregateSnapshot,"flowFilesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupBytesReceived(Double.valueOf(checkExists(aggregateSnapshot,"bytesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupReceived(checkExists(aggregateSnapshot,"received"));
remoteProcessGroupArrayList.add(remoteProcessGroupInstance);
}
}
catch(Exception e){
logger.info("There was an error creating the list of remote process groups: " + e.getMessage());
}
I maintain the nominal Python client for NiFi, given the outcome you mention seeking I suggest you could try:
import nipyapi
nipyapi.utils.set_endpoint('http://localhost:8080/nifi')
rpg_info = [nipyapi.canvas.get_remote_process_group(rpg.id) for rpg in nipyapi.canvas.list_all_remote_process_groups('root', True)]
The RPG info returned will give you the parent ProcessGroup ID under .component.parent_group_id, allowing you to reconstruct the tree, but you should find it much more performant than seeking each individually.