Search code examples
google-bigquerygoogle-cloud-storageapache-nifi

How to convert Json to CSV and send it to big query or google cloud bucket


I`m new to nifi and I want to convert big amount of json data to csv format. This is what I am doing at the moment but it is not the expected result.

These are the steps:

processes to create access_token and send request body using InvokeHTTP(This part works fine I wont name the processes since this is the expected result) and getting the response body in json.

Example of json response:

    [
   {
      "results":[
         {
            "customer":{
               "resourceName":"customers/123456789",
               "id":"11111111"
            },
            "campaign":{
               "resourceName":"customers/123456789/campaigns/222456422222",
               "name":"asdaasdasdad",
               "id":"456456546546"
            },
            "adGroup":{
               "resourceName":"customers/456456456456/adGroups/456456456456",
               "id":"456456456546",
               "name":"asdasdasdasda"
            },
            "metrics":{
               "clicks":"11",
               "costMicros":"43068982",
               "impressions":"2079"
            },
            "segments":{
               "device":"DESKTOP",
               "date":"2021-11-22"
            },
            "incomeRangeView":{
               "resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
            }
         },
            {
            "customer":{
               "resourceName":"customers/123456789",
               "id":"11111111"
            },
            "campaign":{
               "resourceName":"customers/123456789/campaigns/222456422222",
               "name":"asdasdasdasd",
               "id":"456456546546"
            },
            "adGroup":{
               "resourceName":"customers/456456456456/adGroups/456456456456",
               "id":"456456456546",
               "name":"asdasdasdas"
            },
            "metrics":{
               "clicks":"11",
               "costMicros":"43068982",
               "impressions":"2079"
            },
            "segments":{
               "device":"DESKTOP",
               "date":"2021-11-22"
            },
            "incomeRangeView":{
               "resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
            }
         },
....etc....
      ]
   }
]

Now I am using: ===>SplitJson ($[].results[])==>JoltTransformJSON with this spec:

[{
    "operation": "shift",
    "spec": {

        "customer": {
            "id": "customer_id"
        },
        "campaign": {
            "id": "campaign_id",
            "name": "campaign_name"
        },
        "adGroup": {
            "id": "ad_group_id",
            "name": "ad_group_name"

        },
        "metrics": {
            "clicks": "clicks",
            "costMicros": "cost",
            "impressions": "impressions"
        },
        "segments": {
            "device": "device",
            "date": "date"
        },
        "incomeRangeView": {
            "resourceName": "keywords_id"
        }
    }
}]

==>> MergeContent( here is the problem which I don`t know how to fix) Merge Strategy: Defragment Merge Format: Binary Concatnation Attribute Strategy Keep Only Common Attributes Maximum number of Bins 5 (I tried 10 same result) Delimiter Strategy: Text Header: [ Footer: ] Demarcator: ,

What is the result I get? I get a json file that has parts of the json data Example: I have 50k customer_ids in 1 json file so I would like to send this data into big query table and have all the ids under the same field "customer_id".

The MergeContent uses the split json files and combines them but I will still get 10k customer_ids for each file i.e. I have 5 files and not 1 file with 50k customer_ids

After the MergeContent I use ==>>ConvertRecord with these settings: Record Reader JsonTreeReader (Schema Access Strategy: InferSchema) Record Writer CsvRecordWriter ( Schema Write Strategy: Do Not Write Schema Schema Access Strategy: Inherit Record Schema CSV Format: Microsoft Excel Include Header Line: true Character Set UTF-8 )

==>>UpdateAttribute (custom prop: filename: ${filename}.csv) ==>> PutGCSObject(and put the data into the google bucket (this step works fine- I am able to put files there))

With this approach I am UNABLE to send data to big query(After MergeContent I tried using PutBigQueryBatch and used this command in bq sheel to get the schema I need:

bq show --format=prettyjson some_data_set.some_table_in_that_data_set | jq '.schema.fields' 

I filled all the fields as needed and Load file type: I tried NEWLINE_DELIMITED_JSON or CSV if I converted it to CSV (I am not getting errors but no data is uploaded into the table) )

What am I doing wrong? I basically want to map the data in such a way that each fields data will be under the same field name


Solution

  • The trick you are missing is using Records.

    Instead of using X>SplitJson>JoltTransformJson>Merge>Convert>X, try just X>JoltTransformRecord>X with a JSON Reader and a CSV Writer. This skips a lot of inefficiency.

    If you really need to split (and you should avoid splitting and merging unless totally necessary), you can use MergeRecord instead - again with a JSON Reader and CSV Writer. This would make your flow X>Split>Jolt>MergeRecord>X.