Search code examples
springapirestapache-nifi

Issue with NiFi Rest API in fetching Remote Process Group details


Using below mentioned NiFi Rest API endpoint and code snippet,

I am fetching a list of Remote Process Groups (RPG), iterating and fetching each RPG details. The problem is, I am getting inaccurate RPG data. If I hit this Endpoint (https://nifihost:8080/nifi-api/remote-process-groups/{id}), I am receiving accurate details. Please clarify,

  1. Why there is discrepancy between the results of these two end-points? (https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups Vs. https://nifihost:8080/nifi-api/remote-process-groups/{id})
  2. As my requirement is to iterate via each Process Group, getting a list of Remote Process Groups (RPG) within it and fetching each RPG details? What is the right way to achieve this?

Endpoint:

https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups

Source Code

    ArrayList<NifiRemoteProcessGroup> remoteProcessGroupArrayList = new ArrayList<>();
    String returnedJSON = "";
    String remoteProcessGroupURL =  getNifiURL() + "/nifi-api/process-groups/" + processGroup + "/remote-process-groups";

    HttpEntity httpEntity = RestCall.oAuthHeaders(token);
    RestTemplate restTemplate = new RestTemplate();

    try{
        ResponseEntity<String> response = restTemplate.exchange(remoteProcessGroupURL,HttpMethod.GET,httpEntity,String.class);
        returnedJSON = response.getBody();
    }
    catch(Exception e){
        logger.error("There was an error retrieving the remote-process-groups : " + e.getMessage());
    }

    try{
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode rootNode = objectMapper.readTree(returnedJSON);
        JsonNode processorNode = rootNode.path("remoteProcessGroups");
        Iterator<JsonNode> elements = processorNode.elements();

        while(elements.hasNext()){
            JsonNode remoteProcessGroup = elements.next();
            JsonNode statusElement = remoteProcessGroup.path("status");
            JsonNode bulletinElement = remoteProcessGroup.path("bulletins");
            JsonNode componentElement = remoteProcessGroup.path("component");
            JsonNode aggregateSnapshot = statusElement.path("aggregateSnapshot");

            NifiRemoteProcessGroup remoteProcessGroupInstance = new NifiRemoteProcessGroup();
            remoteProcessGroupInstance.setRemoteProcessGroupId(checkExists(statusElement,"id"));
            remoteProcessGroupInstance.setRemoteProcessGroupName(checkExists(componentElement,"name"));
            remoteProcessGroupInstance.setRemoteProcessGroupGroupId(checkExists(statusElement,"groupId"));
            remoteProcessGroupInstance.setRemoteProcessGroupTargetURL(checkExists(componentElement,"targetUri"));
            remoteProcessGroupInstance.setRemoteProcessGroupBulletins(bulletinElement.asText());
            remoteProcessGroupInstance.setRemoteProcessGroupTransmitting(Boolean.valueOf(checkExists(componentElement,"transmitting")));
            remoteProcessGroupInstance.setRemoteProcessGroupTransmissionStatus(checkExists(statusElement,"transmissionStatus"));
            remoteProcessGroupInstance.setRemoteProcessGroupActiveThreadCount(Double.valueOf(checkExists(aggregateSnapshot,"activeThreadCount")));
            remoteProcessGroupInstance.setRemoteProcessGroupFlowFilesReceived(Double.valueOf(checkExists(aggregateSnapshot,"flowFilesReceived")));
            remoteProcessGroupInstance.setRemoteProcessGroupBytesReceived(Double.valueOf(checkExists(aggregateSnapshot,"bytesReceived")));
            remoteProcessGroupInstance.setRemoteProcessGroupReceived(checkExists(aggregateSnapshot,"received"));
            remoteProcessGroupArrayList.add(remoteProcessGroupInstance);
        }
    }
    catch(Exception e){
        logger.info("There was an error creating the list of remote process groups: " + e.getMessage());
    }

Solution

    1. 'process-groups/{id}/remote-process-groups' is part of the ProcessGroupsAPI subsection, and will return a RemoteProcessGroupsEntity, which contains a listing of the Remote Process Groups bounded with the ProcessGroup of the ID you submit.
    2. 'remote-process-groups/{id}' is part of the RemoteProcessGroups API, and will fetch the exact RemoteProcessGroupEntity (note the lack of plural) requested.

    I maintain the nominal Python client for NiFi, given the outcome you mention seeking I suggest you could try:

    import nipyapi
    nipyapi.utils.set_endpoint('http://localhost:8080/nifi')
    rpg_info = [nipyapi.canvas.get_remote_process_group(rpg.id) for rpg in nipyapi.canvas.list_all_remote_process_groups('root', True)]
    

    The RPG info returned will give you the parent ProcessGroup ID under .component.parent_group_id, allowing you to reconstruct the tree, but you should find it much more performant than seeking each individually.