I`m new to nifi and I want to convert big amount of json data to csv format. This is what I am doing at the moment but it is not the expected result.
These are the steps:
processes to create access_token and send request body using InvokeHTTP(This part works fine I wont name the processes since this is the expected result) and getting the response body in json.
Example of json response:
[
{
"results":[
{
"customer":{
"resourceName":"customers/123456789",
"id":"11111111"
},
"campaign":{
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdaasdasdad",
"id":"456456546546"
},
"adGroup":{
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdasda"
},
"metrics":{
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
},
"segments":{
"device":"DESKTOP",
"date":"2021-11-22"
},
"incomeRangeView":{
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
}
},
{
"customer":{
"resourceName":"customers/123456789",
"id":"11111111"
},
"campaign":{
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdasdasdasd",
"id":"456456546546"
},
"adGroup":{
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdas"
},
"metrics":{
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
},
"segments":{
"device":"DESKTOP",
"date":"2021-11-22"
},
"incomeRangeView":{
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
}
},
....etc....
]
}
]
Now I am using: ===>SplitJson ($[].results[])==>JoltTransformJSON with this spec:
[{
"operation": "shift",
"spec": {
"customer": {
"id": "customer_id"
},
"campaign": {
"id": "campaign_id",
"name": "campaign_name"
},
"adGroup": {
"id": "ad_group_id",
"name": "ad_group_name"
},
"metrics": {
"clicks": "clicks",
"costMicros": "cost",
"impressions": "impressions"
},
"segments": {
"device": "device",
"date": "date"
},
"incomeRangeView": {
"resourceName": "keywords_id"
}
}
}]
==>> MergeContent( here is the problem which I don`t know how to fix) Merge Strategy: Defragment Merge Format: Binary Concatnation Attribute Strategy Keep Only Common Attributes Maximum number of Bins 5 (I tried 10 same result) Delimiter Strategy: Text Header: [ Footer: ] Demarcator: ,
What is the result I get? I get a json file that has parts of the json data Example: I have 50k customer_ids in 1 json file so I would like to send this data into big query table and have all the ids under the same field "customer_id".
The MergeContent uses the split json files and combines them but I will still get 10k customer_ids for each file i.e. I have 5 files and not 1 file with 50k customer_ids
After the MergeContent I use ==>>ConvertRecord with these settings: Record Reader JsonTreeReader (Schema Access Strategy: InferSchema) Record Writer CsvRecordWriter ( Schema Write Strategy: Do Not Write Schema Schema Access Strategy: Inherit Record Schema CSV Format: Microsoft Excel Include Header Line: true Character Set UTF-8 )
==>>UpdateAttribute (custom prop: filename: ${filename}.csv) ==>> PutGCSObject(and put the data into the google bucket (this step works fine- I am able to put files there))
With this approach I am UNABLE to send data to big query(After MergeContent I tried using PutBigQueryBatch and used this command in bq sheel to get the schema I need:
bq show --format=prettyjson some_data_set.some_table_in_that_data_set | jq '.schema.fields'
I filled all the fields as needed and Load file type: I tried NEWLINE_DELIMITED_JSON or CSV if I converted it to CSV (I am not getting errors but no data is uploaded into the table) )
What am I doing wrong? I basically want to map the data in such a way that each fields data will be under the same field name
The trick you are missing is using Records.
Instead of using X>SplitJson>JoltTransformJson>Merge>Convert>X, try just X>JoltTransformRecord>X with a JSON Reader and a CSV Writer. This skips a lot of inefficiency.
If you really need to split (and you should avoid splitting and merging unless totally necessary), you can use MergeRecord instead - again with a JSON Reader and CSV Writer. This would make your flow X>Split>Jolt>MergeRecord>X.