Search code examples
complex-event-processingcumulocity

Working on historical data (incrementing properties)


I'm trying to write a CEP rule that would take all the existing ACTIVE alarms and increase a specific fragment bike_alarm.priority by 1 every minute. This is the whole structure of alarm:

{
    "count": 1,
    "creationTime": "2018-07-09T15:30:20.790+02:00",
    "time": "2014-03-03T12:03:27.845Z",
    "history": {
        "auditRecords": [],
        "self": "https://cumulocity.com/audit/auditRecords"
    },
    "id": "1024",
    "self": "https://cumulocity.com/alarm/alarms/1024",
    "severity": "WARNING",
    "source": {
        "id": "22022",
        "name": "01BIKE_STATION_NORTH",
        "self": "https://cumulocity.com/inventory/managedObjects/22022"
    },
    "status": "ACTIVE",
    "text": "Bike disconnected",
    "type": "bike_error_01",
    "bike_alarm" {
        "priority": 10
    }
}

This is what I managed to create (based mainly on this question):

create schema Alarm as Alarm;

create schema CollectedAlarms(
    alarms List
);

create schema SingleAlarm(
    alarm Alarm
);

@Name("Collecting alarms")
insert into CollectedAlarms
select findAllAlarmByTimeBetween(
    current_timestamp().minus(100 day).toDate(), 
    current_timestamp().toDate()
) as alarms
from pattern[every timer:interval(30 sec)];

@Name("Splitting alarms")
insert into SingleAlarm
select
    singleAlarm as alarm
from 
    CollectedAlarms as alarms unidirectional,
    CollectedAlarms[alarms@type(Alarm)] as singleAlarm;

@Name("Rising priority")
insert into UpdateAlarm
select
    sa.alarm.id as id,
    {
        "bike_alarm.priority", GetNumber(sa.alarm, "bike_alarm.priority". 0) + 1
    } as fragments
from pattern [every sa = SingleAlarm -> (timer:interval(1 minutes))];

the problem is that not all alarms are founded and even those that are the incrementation don't work, priority is set to null.

Additionally, could you point me in direction of some better documentation? Is this something you use?


Solution

  • In general the esper documentation that you linked is the best place to look for the generic syntax. In combination you probably sometimes also need the Cumulocity documentation for the specific stuff: http://cumulocity.com/guides/event-language/introduction

    Coming to your problems: You are miss-using a realtime processing engine to do cron-like batch operations. While it technically can be done this might not be the best approach and I will show you a different approach that you can take.

    But first solving your approach:

    1. The database queries like findAllAlarmByTimeBetween() will only return up to 2000 results and there is no way to get the next page like on the REST API of Cumulocity. Also if you say you want to handle only active alarms you should use a function that also filters for the status.
    2. Getting null out of a function like getNumber() means the JsonPath wasn't found or the dataType is incorrect (using getNumber for a String). You can set a default value for that case as the third parameter. From you Json that you provided it looks correct though. The syntax errors in your code are copy paste errors I assume as otherwise you wouldn't have been able to deploy it.

    In my opinion you should approach that differently: On each incoming alarm raise the priority after one minute if it hasn't been cleared. Additionally trigger the 1 minute timer again. Like a loop until the alarm is cleared.

    The pattern for this would like like that:

    from pattern [
       every e = AlarmCreated(alarm.status = CumulocityAlarmStatuses.ACTIVE) 
       -> (timer:interval(1 minutes) 
          and not AlarmUpdated(alarm.status != CumulocityAlarmStatuses.ACTIVE, alarm.id.value = e.alarm.id.value))
    ];
    

    You need one with AlarmCreated which will only cover the initial increase and a second statement that triggers on your own schema which is then run in a loop.

    In general try to avoid as many database calls as you can. So keep the loop counter in your schema so you only always need to execute the update call of the alarm.