Search code examples
wso2siddhi

wso2 cep Siddhiql


I have multiple sensors sending measurement events to a stream. An event consists of {parameter, value, timestamp}. I want to observe these values for a time window of few days and check the trends and make diagnostics about the equipment being monitored by these different sensors.

  1. Divide the streams by parameter.
    from inputStream[parameter='A']
    select *
    insert into Astream;

and so on for each parameter being received.

  1. for the timewindow, say 60 seconds, compute linear regression to find the change.
    from Astream#timeseries:lengthTimeRegress(60000, value, timestamp)
    select beta1 * 100 as AChange 
    insert into AChangeStream;

This I do for each metric stream. 3. Once I have trend for each stream, I collect the changed values for each stream and check if they meet the condition.

    from every e1=AChangeStream[e1.AChangeStream > 0.5], e2=BChangeStream[e2.BChangeStream  0.15]
    select 'condition 1 alarm' as message
    insert into alertStream;

Will the above siddhi ql detect the changes in 6 parameters in the time window?


Solution

  • The gist of the query you have provided is correct except for few minor things you have missed. When you say 6 parameters, I believe that you have something similar to Parameters A, B, C, D, E and F. Eventually you seem to want to find a sequence of events that match the given condition [1].

    Considering only 2 Parameters A and B, you could write the queries in Siddhi language as follows, to achieve your requirement.

    
    @Import('input:1.0.0')
    define stream inputStream (parameter string, value double, timestamp long);    
    
    
    from inputStream[parameter=='A']
    select *
    insert into Astream;    
    
    from inputStream[parameter=='B']
    select *
    insert into Bstream;    
    
    from Astream#timeseries:lengthTimeRegress(60000, 10000, value, timestamp)
    select beta1*100 as AChange 
    insert into AChangeStream;    
    
    from Bstream#timeseries:lengthTimeRegress(60000, 10000, value, timestamp)
    select beta1*100 as BChange 
    insert into BChangeStream;    
    
    from every e1=AChangeStream[e1.AChange > 0.5], e2=BChangeStream[e2.BChange > 0.15]
    select 'condition 1 alarm' as message
    insert into alertStream;
    
    
    

    Please note the following.

    1. In lengthTimeRegress function, you need to provide 4 mandatory parametrs, as specified in [2]. You have missed the batch size in the query you have written. The maximum number of events to be used for a regression calculation is specified by the batch size.
    2. In the sequence condition, you need to use a parameter. Not the stream name. What you have mistakenly written as e1.AChangeStream > 0.5 must change as e1.AChange > 0.5

    [1] https://docs.wso2.com/display/CEP420/SiddhiQL+Guide+3.1#SiddhiQLGuide3.1-Sequence
    [2] https://docs.wso2.com/display/SIDDHIEXTENSIONS/Regression