I am trying to process a nested JSON and flatten it in Apache NiFi, with the help of the JoltTransformation processor by supplying a spec.
Sample JSON:
Input
{
"status": 0,
"body": {
"updatetime": 1667946041,
"timezone": "Europe/Luxembourg",
"measuregrps": [
{
"grpid": 3807008748,
"attrib": 0,
"date": 1660928128,
"created": 1660928165,
"modified": 1660928167,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"measures": [
{
"value": 62,
"type": 11,
"unit": 0,
"algo": 16909313,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 63,
"type": 135,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 143,
"type": 136,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 396,
"type": 137,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 402,
"type": 138,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
}
],
"comment": null
},
{
"grpid": 3801487301,
"attrib": 0,
"date": 1660725664,
"created": 1660725667,
"modified": 1660725667,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"measures": [
{
"value": 968,
"type": 54,
"unit": -1,
"algo": 33619969,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
}
],
"comment": null,
"is_inconclusive": false
},
{
"grpid": 3801485699,
"attrib": 0,
"date": 1660725563,
"created": 1660725596,
"modified": 1660725597,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"measures": [
{
"value": 72,
"type": 11,
"unit": 0,
"algo": 16909313,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 60,
"type": 135,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 150,
"type": 136,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 366,
"type": 137,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
},
{
"value": 400,
"type": 138,
"unit": 0,
"algo": 0,
"fm": 255,
"apppfmid": 9,
"appliver": 2421
}
],
"comment": null
},
{
"grpid": 3799078577,
"attrib": 2,
"date": 1660636784,
"created": 1660636789,
"modified": 1660636789,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"measures": [
{
"value": 61000,
"type": 1,
"unit": -3,
"algo": 0,
"fm": 255,
"apppfmid": 7,
"appliver": 5110101
}
],
"comment": null
},
{
"grpid": 3781281953,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"measures": [
{
"value": 6200,
"type": 1,
"unit": -2,
"algo": 0,
"fm": 255
}
],
"comment": null
},
{
"grpid": 3781281952,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"measures": [
{
"value": 1670,
"type": 4,
"unit": -3,
"algo": 0,
"fm": 255
}
],
"comment": null
}
]
}
}
Jolt Spec
[
{
"operation": "shift",
"spec": {
"body": {
"measuregrps": {
"*": {
"measures": {
"*": {
"*": "[&1].&"
}
},
"@(3,status)": "[&1].status",
"@(2,updatetime)": "[&1].updatetime",
"@(2,timezone)": "[&1].timezone",
"*": "[&1].&"
}
}
}
}
}
]
Output
[
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3807008748,
"attrib": 0,
"date": 1660928128,
"created": 1660928165,
"modified": 1660928167,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"value": [
62,
968,
72,
61000,
6200,
1670
],
"type": [
11,
54,
11,
1,
1,
4
],
"unit": [
0,
-1,
0,
-3,
-2,
-3
],
"algo": [
16909313,
33619969,
16909313,
0,
0,
0
],
"fm": [
255,
255,
255,
255,
255,
255
],
"apppfmid": [
9,
9,
9,
7
],
"appliver": [
2421,
2421,
2421,
5110101
],
"comment": null
},
{
"value": [
63,
60
],
"type": [
135,
135
],
"unit": [
0,
0
],
"algo": [
0,
0
],
"fm": [
255,
255
],
"apppfmid": [
9,
9
],
"appliver": [
2421,
2421
],
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3801487301,
"attrib": 0,
"date": 1660725664,
"created": 1660725667,
"modified": 1660725667,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment": null,
"is_inconclusive": false
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3801485699,
"attrib": 0,
"date": 1660725563,
"created": 1660725596,
"modified": 1660725597,
"category": 1,
"deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid": "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment": null
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3799078577,
"attrib": 2,
"date": 1660636784,
"created": 1660636789,
"modified": 1660636789,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"comment": null
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3781281953,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"comment": null
},
{
"status": 0,
"updatetime": 1667905592,
"timezone": "Europe/Luxembourg",
"grpid": 3781281952,
"attrib": 2,
"date": 1659950922,
"created": 1659950922,
"modified": 1659950922,
"category": 1,
"deviceid": null,
"hash_deviceid": null,
"comment": null
}
]
Expected Output
[ {
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3807008748,
"attrib" : 0,
"date" : 1660928128,
"created" : 1660928165,
"modified" : 1660928167,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"value" : 62,
"type" : 11,
"unit" : 0,
"algo" : 16909313,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"comment" : null
}, {
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3807008748,
"attrib" : 0,
"date" : 1660928128,
"created" : 1660928165,
"modified" : 1660928167,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"value" : 968,
"type" : 54,
"unit" : -1,
"algo" : 33619969,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"comment" : null
},
{
"value" : 63,
"type" : 135,
"unit" : 0,
"algo" : 0,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3801487301,
"attrib" : 0,
"date" : 1660725664,
"created" : 1660725667,
"modified" : 1660725667,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment" : null,
"is_inconclusive" : false
},
{
"value" : 60,
"type" : 135,
"unit" : 0,
"algo" : 0,
"fm" : 255,
"apppfmid" : 9,
"appliver" : 2421,
"status" : 0,
"updatetime" : 1667905592,
"timezone" : "Europe/Luxembourg",
"grpid" : 3801487301,
"attrib" : 0,
"date" : 1660725664,
"created" : 1660725667,
"modified" : 1660725667,
"category" : 1,
"deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"hash_deviceid" : "01fc9191c2e72ba05397211f47ec0d0e3db71ac3",
"comment" : null,
"is_inconclusive" : false
}
]
So my expectation is to have flatten the JSON and have single values for each object, after which I plan to send this over to the ConvertJsontoSql processor within the NiFi to have the records inserted into PostgresDB.
https://jolt-demo.appspot.com/
Also, what does is_inconclusive
mean I can see it gets added to my output when using Jolt Transform
You can group the values by grpid
along with indexes of measures
array(while looping within this array's elements) such as
[
{
"operation": "shift",
"spec": {
"body": {
"measuregrps": {
"*": {
"measures": {
"*": {
"*": {
"@(6,status)": "@(4,grpid)[&2].status", // @(6,status) -> going the tree 6 levels up (to traverse "{" 6 times ) in order to reach the level of the attribute "status", "@(4,grpid) -> to traverse ":"(element on th right hand side) once, and "{" three times(total four level), [&2] -> to reach the level of indexes of "measures" array and produce results in array manner by using "[]" notation
"@(5,updatetime)": "@(4,grpid)[&2].updatetime",
"@(5,timezone)": "@(4,grpid)[&2].timezone",
"@(3,grpid)": "@(4,grpid)[&2].grpid",
"@(3,date)": "@(4,grpid)[&2].date",
"@(3,created)": "@(4,grpid)[&2].created",
"@(3,modified)": "@(4,grpid)[&2].modified",
"@(3,category)": "@(4,grpid)[&2].category",
"@(3,deviceid)": "@(4,grpid)[&2].deviceid",
"@(3,hash_deviceid)": "@(4,grpid)[&2].hash_deviceid",
"@": "@(4,grpid)[&2].&"
}
}
}
}
}
}
}
},
{
// get rid of object labels
"operation": "shift",
"spec": {
"*": {
"*": ""
}
}
},
{
// pick only single value from the repeating components of the array values
"operation": "cardinality",
"spec": {
"*": {
"*": "ONE"
}
}
}
]
which will yield a flattened result as needed.
P.S : I don't know and didn't encounter is_inconclusive
within the Jolt context.