Search code examples
mongodbaggregation-frameworkapache-nifi

NiFi's 'Run Mongo Aggregation' showing JSONMappingException: Unrecognized Token ISODate


Below is my sample data: (Parent JSON having floor information and every floor having multiple sensor information)

{"Floor_Id": "Galileo_001",
    "name": "Forklifts",
    "Sensor_data": [{
            "Floor_Id": "Galileo_001",
            "Floor_name": "Forklifts",
            "Name": "forkLift_002",
            "Asset_Id": 123,
            "Load": 1.7096133,
            "Timestamp": 1537878750996
        },
        {
            "Floor_Id": "Galileo_001",
            "Floor_name": "Forklifts",
            "Name": "forkLift_003",
            "Asset_Id": 456,
            "Load": 1.7096133,
            "Timestamp": 1537878750996,
        },
        {
            "Floor_Id": "Galileo_001",
            "Floor_name": "Forklifts",
            "Name": "forkLift_005 ",
            "Asset_Id": 127,
            "Load": 1.7096133,
            "Timestamp": 1537878750996
        },
        {
            "Floor_Id": "Galileo_001",
            "Floor_name": "Forklifts",
            "Name": "forkLift_001",
            "Asset_Id": 157,
            "Load": 1.7096133,
            "Timestamp": 1537878750996,
        }
]}

I want to calculate total load, total distance and active sensors for a floor. And the similar information (total load and total distance for every floor) on a per day basis. The desired response is like this:

{
    "TotalLoad": 3214,
    "Active_no_of_sensor":5,
    "Floor_Id": "Galileo_001",
    "LoadUnit": "Kgs",
    "AssetStatus": [{
        "TotalLoad": 200,
        "LoadUnit": "Kgs",
        "Date": "1539588994"
    }, {
        "TotalLoad": 400,
        "LoadUnit": "Kgs",
        "Date": "1539475200"
    }, {
        "TotalLoad": 100,
        "LoadUnit": "Kgs",
        "Date": "1539388800"
    }]
}

In NiFi, I am having two separate run mongo aggregation processors.

  1. For calculating total load and total no of active sensors for every floor.
  2. For calculating total load etc. for every floor per day basis.

For this 2nd condition, I'm using the below query:

[{"$unwind" : "$Sensor_data" },
{"$match": {"Floor_Id" : {"$eq": "${http.param.floorid}"}}},
{"$group": {
     "_id": {"Floor_Id": "$Floor_Id", 
         "DailyDate":{"$dateFromParts":{
                    "year":{"$year":{"$add": [ISODate("1970-01-01"), "$Sensor_data.Timestamp"]}},
                    "month":{"$month":{"$add": [ISODate("1970-01-01"), "$Sensor_data.Timestamp"]}},
                    "day":{"$dayOfMonth":{"$add": [ISODate("1970-01-01"), "$Sensor_data.Timestamp"]}}
                      } 
                  }
             },
     "AssetLoad": {"$sum": "$Sensor_data.Load" },
     "AssetDistance": {"$sum" : {"$subtract": [{"$toDecimal":"$Sensor_data.End_odometer"},
                                                {"$toDecimal":"$Sensor_data.Start_odometer"}]}}
    }
},
{
    "$group" : {
            "_id": {"Floor_Id": "$_id.Floor_Id"},
            "TotalLoad": { "$sum": "$AssetLoad" },
            "TotalDistance" : { "$sum": "$AssetDistance" },
            "AssetStatus":{
                "$push":{
                    "TotalLoad": "$AssetLoad",
                    "LoadUnit": "Kgs",
                    "TotalDistance" : "$AssetDistance",
                    "Date": "$_id.DailyDate"
                    }
                }
        }
},
{"$project" : {
    _id:0,
    "AssetStatus" : 1

    }
}]

Error

The processor shows 'Error running mongodb aggregation query: Unrecognized Token ISODate: was expecting null, true, false or NaN'

How can I resolve this issue? (I do not want to update the format of date from BSON Long to ISODate in the collection)


Solution

  • RESOLVED

    Replaced ISODate with $convert operator in Run Mongo Aggregation processor