Search code examples
avroapache-nifijolt

Nifi RecordReader & RecordWriter serialization error. IllegalTypeConversionException; Cannot convert value of class; because the type is not supported


I'm trying to convert json to json using JoltTransformRecord in Apache Nifi. When I try transform json in https://jolt-demo.appspot.com/, I'm getting correct result. This is okey.

But, when I'm trying to transform json using JoltTransformRecord, it is throws an exception. Error is; "Cannot convert value of class [Ljava.lang.Object; because the type is not supported". But I didn't understand why I'm getting this error. I did check my input and output schemas, but I didn't find anything. They looks like correct.

In the below, my input and output json examples, jolt specification, input and output schemas are given. Also, for this, I'm using JsonTreeReader and JsonRecordSetWriter.

--- How can I solve this problem? ---

Example input json for JoltTransformRecord(In this example, there is only one json object inside array. But actually, there are a lot of json object inside array.);

[ {
  "uuid" : "MFMS1-MC5",
  "componentId" : "path1",
  "Samples" : {
    "PathFeedrate" : [ {
      "dataItemId" : "pf",
      "timestamp" : "2019-03-01T21:48:27.940558Z",
      "sequence" : "68104",
      "value" : "425.5333",
      "name" : "Fact",
      "subType" : "ACTUAL"
    }, {
      "dataItemId" : "pf",
      "timestamp" : "2019-03-01T21:48:30.244219Z",
      "sequence" : "68117",
      "value" : "0",
      "name" : "Fact",
      "subType" : "ACTUAL"
    } ]
  },
  "Events" : {
    "SequenceNumber" : [ {
      "dataItemId" : "seq",
      "timestamp" : "2019-03-01T21:48:27.940558Z",
      "sequence" : "68105",
      "value" : "0",
      "name" : "sequenceNum"
    } ],
    "Unit" : [ {
      "dataItemId" : "unit",
      "timestamp" : "2019-03-01T21:48:27.940558Z",
      "sequence" : "68106",
      "value" : "13",
      "name" : "unitNum"
    } ]
  }
}]

Sample output Json I want;

{
  "DataItems" : [ {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Samples",
    "type" : "PathFeedrate",
    "dataItemId" : "pf",
    "timestamp" : "2019-03-01T21:48:27.940558Z",
    "sequence" : "68104",
    "value" : "425.5333",
    "name" : "Fact",
    "subType" : "ACTUAL"
  }, {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Samples",
    "type" : "PathFeedrate",
    "dataItemId" : "pf",
    "timestamp" : "2019-03-01T21:48:30.244219Z",
    "sequence" : "68117",
    "value" : "0",
    "name" : "Fact",
    "subType" : "ACTUAL"
  }, {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Events",
    "type" : "SequenceNumber",
    "dataItemId" : "seq",
    "timestamp" : "2019-03-01T21:48:27.940558Z",
    "sequence" : "68105",
    "value" : "0",
    "name" : "sequenceNum"
  }, {
    "uuid" : "MFMS1-MC5",
    "componentId" : "path1",
    "eventType" : "Events",
    "type" : "Unit",
    "dataItemId" : "unit",
    "timestamp" : "2019-03-01T21:48:27.940558Z",
    "sequence" : "68106",
    "value" : "13",
    "name" : "unitNum"
  } ]
}

My Jolt Specification;

[
  {
    "operation": "shift",
    "spec": {
      "Samples": {
        "*": {
          "*": {
            "@(3,uuid)": "Items.&2[#2].uuid",
            "@(3,componentId)": "Items.&2[#2].componentId",
            "$2": "Items.&2[#2].eventType",
            "$1": "Items.&2[#2].type",
            "*": "Items.&2[#2].&"
          }
        }
      },
      "Events": {
        "*": {
          "*": {
            "@(3,uuid)": "Items.&2[#2].uuid",
            "@(3,componentId)": "Items.&2[#2].componentId",
            "$2": "Items.&2[#2].eventType",
            "$1": "Items.&2[#2].type",
            "*": "Items.&2[#2].&"
          }
        }
      },
      "Condition": {
        "*": {
          "*": {
            "@(3,uuid)": "Items.&2[#2].uuid",
            "@(3,componentId)": "Items.&2[#2].componentId",
            "$2": "Items.&2[#2].eventType",
            "$1": "Items.&2[#2].value",
            "*": "Items.&2[#2].&"
          }
        }
      }
    }
  },
  {
    "operation": "shift",
    "spec": {
      "Items": {
        "*": {
          "*": "DataItems[]"
        }
      }
    }
  }
]

This specification is working correctly. Because I have tried it in Jolt transform demo.

I'm using JsonTreeReader for read json in JoltTransformRecord. And this is my input schema;

{
    "name": "Items",
    "namespace": "Items",
    "type": "record",
    "fields": [
        {
            "name": "uuid",
            "type": "string"
        },
        {
            "name": "componentId",
            "type": "string"
        },
        {
            "name": "Samples",
            "type": ["null", {
                "type": "map",
                "values": {
                    "type": "array",
                    "items": {
                        "name": "SamplesDataItem",
                        "type": "record",
                        "fields": [
                            {
                                "name": "dataItemId",
                                "type": "string"
                            },
                            {
                                "name": "timestamp",
                                "type": "string"
                            },
                            {
                                "name": "sequence",
                                "type": "string"
                            },
                            {
                                "name": "value",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "subType",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "sampleRate",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "statistic",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "duration",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "sampleCount",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "compositionId",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "resetTriggered",
                                "type": ["null", "string"]
                            }
                        ]
                    }
                }
            }]
        },
        {
            "name": "Events",
            "type": ["null", {
                "type": "map",
                "values": {
                    "type": "array",
                    "items": {
                        "name": "EventsDataItem",
                        "type": "record",
                        "fields": [
                            {
                                "name": "dataItemId",
                                "type": "string"
                            },
                            {
                                "name": "timestamp",
                                "type": "string"
                            },
                            {
                                "name": "sequence",
                                "type": "string"
                            },
                            {
                                "name": "value",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "subType",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "compositionId",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "resetTriggered",
                                "type": ["null", "string"]
                            }
                        ]
                    }
                }
            }]
        },
        {
            "name": "Condition",
            "type": ["null", {
                "type": "map",
                "values": {
                    "type": "array",
                    "items": {
                        "name": "ConditionDataItem",
                        "type": "record",
                        "fields": [
                            {
                                "name": "dataItemId",
                                "type": "string"
                            },
                            {
                                "name": "timestamp",
                                "type": "string"
                            },
                            {
                                "name": "type",
                                "type": "string"
                            },
                            {
                                "name": "sequence",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "subType",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "nativeCode",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "nativeSeverity",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "qualifier",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "statistic",
                                "type": ["null", "string"]
                            },
                            {
                                "name": "compositionId",
                                "type": ["null", "string"]
                            }
                        ]
                    }
                }
            }]
        }
    ]
}

I'm using JsonRecordSetWriter for write converted result in JoltTransformRecord. And this is my output schema;

{
    "name": "Items",
    "type": "record",
    "namespace": "Items",
    "fields": [
        {
            "name": "DataItems",
            "type": {
                "type": "array",
                "items": {
                    "name": "DataItems",
                    "type": "record",
                    "fields": [
                        {
                            "name": "uuid",
                            "type": "string"
                        },
                        {
                            "name": "componentId",
                            "type": "string"
                        },
                        {
                            "name": "eventType",
                            "type": "string"
                        },
                        {
                            "name": "type",
                            "type": "string"
                        },
                        {
                            "name": "dataItemId",
                            "type": "string"
                        },
                        {
                            "name": "timestamp",
                            "type": "string"
                        },
                        {
                            "name": "value",
                            "type": "string"
                        },
                        {
                            "name": "name",
                            "type": ["null", "string"],
                            "default": null
                        },
                        {
                            "name": "subType",
                            "type": ["null", "string"],
                            "default": null
                        }
                    ]
                }
            }
        }
    ]
} 

Solution

  • This is indeed a bug in the record handling utilities, I have written NIFI-6105 to cover the fix. Good catch!

    As a workaround, since you have JSON as input and output, you can use JoltTransformJson instead of JoltTransformRecord. Alternatively, if you know the keys in the map (PathFeedrate, e.g.), you can change the schema to treat it as a record rather than a map, that might get you around the bug.