In Nifi, I would like to implement the following logic in Nifi.
while(true) {
prj_result = invoke_prj_Rest_EndPoint();
prjs_list = prjs_list["prjs"];
for prj in prjs_list:
emp_response = invoke_emp_rest_Endpoint(prj["id"]);
next_cursor = emp_response["meta"]["next_cursor"];
while next_cursor:
sleep(10 secs);
emps = emp_response["emps"];
for emp in emps:
emp["mod_time"] = now();
loadEmpDataToS3()
sleep(5Mins)
}
Approach I have so far,
For while(true) --> I am initializing a param i = 1 and checking $(i:eq(1)) and not incrementing. Is there a better approach? For Sleep or not sure what is the best approach? Also, Not sure how to implement for and prj in prjs_list and internal while.
GenerateFlowFile - is already kind a "while(true)" - it will generate flowfile according to a schedule.
after first invokehttp split response by "prjs" to multiple flowfiles. if response is json you can use SplitJson
after splitting - make required transformation on each "prj" flowfile and then call invokehttp - again in scheduling you can set every 10 sec.
So, flow could look like this:
1 GenerateFlowFile
1 --> 2 InvokeHttp first
2 --> 3 SplitJson by $.prjs
3 --> 4 EvaluateJsonPath extract $.prj.id
4 --> 5 InvokeHttp second with 10 sec schedule
5 --> 6 transform and put to s3
5 --> 7 EvaluateJsonPath extract $.meta.next_cursor
7 --> 8 route on attribute (next_cursor is not empty)
8 --> 5