Search code examples
jsonavroapache-nifi

Need Help infering an avro schema for a json file in NiFi


I am trying to create a flow in NiFi that takes a valid json file and puts it directly into a hive table using the PutHiveStreaming processor. My json looks something like the following:

{
"Raw_Json": {
    "SystemInfo": {
        "Id": "a string ID",
        "TM": null,
        "CountID": "a string ID",
        "Topic": null,
        "AccountID": "some number",
        "StationID": "some number",
        "STime": "some Timestamp",
        "ETime": "some Timestamp"
    },
    "Profile": {
        "ID": "ID number",
        "ProductID": "Some Number",
        "City": "City Name",
        "State": "State Name",
        "Number": "XXX-XXX-XXXX",
        "ExtNumber": null,
        "Unit": null,
        "Name": "Person Name",
        "Service": "Purchase",
        "AddrID": "00000000",
        "Products": {
            "Product": [{
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"

            },
            {
                "Code": "CODE",
                "Description": "some description"
            }]
        }
    },
    "Total": {
        "Amount": "some amount",
        "Delivery": "some address",
        "Estimate": "some amount",
        "Tax": null,
        "Delivery_Type": null

    }

},
"partition_date":"2017-05-19"

}

I am getting the json, using the InferAvroSchema processor and from there converting the json to avro format by using the inferred avro schema and sending it into the PutHiveStreaming processor. My Flow looks something like this:

The main goal is that I want all of the "Raw_Json" column to be dumped into one column in the hive table and the table will be partitioned by the "partition_date" column which will be the second column of the table. The problem is that for some reason NiFi is having problems inferring the nested json from the "Raw_Json" column and is dumping it like Null on the table as shown below:

Does anyone know how could I make NiFi read the entire nested Json of the "Raw_Json" column as one column and send it to the hive table? How could I create my own avro schema for it to do this? Any insight or ideas on how to fix this issue would be greatly appreciated!


Solution

  • Normally you have to create or generate(infer) avro schema only once as soon as your input file format assumed always the same - two fields Raw_Json and partition_date.

    you should have something like this in file for example avro-schema.json:

    {
      "type" : "record",
      "name" : "test",
      "fields" : [ {
        "name" : "Raw_Json",
        "type" : 
        ...
      }, {
        "name" : "partition_date",
        "type" : "string",
        "doc" : "Type inferred from '\"2017-05-19\"'"
      } ]
    }
    

    And use this file as a Record Schema in ConvertJSONToAvro processor.

    The type of the column Raw_Json:

    Or you have to define complex data type fully with all nested fields, arrays, etc.

    Or if you want to write the content of the Raw_Json into string column then you have to convert it to string before converting file to avro. You can use the sequence of EvaluateJsonPath and AttributesToJson processors.