Search code examples
wso2espercomplex-event-processingsiddhi

How do I implement a continuous time window in CEP?


Say I have a stream of events and I want to be able to count how many of them there are in a time window. I would like to receive notification whenever an event enters the time window and changes the count and the same when an event exits the time window.

The below illustration show what I mean. I'm using a time window of length 4 and I want 3 notifications, one when the first event enters the window, the second when the second event enters and the third when the first event exits the time window.

enter image description here

How do I make a query that does that? What if I also want to group by an event's property?

Here's what I have so far, but it doesn't give me a notification when an event leaves the window: @config(async = 'true') define stream myStream (symbol string, timeStamp long) @info(name = 'query1') from myStream#window.externalTime(timeStamp,10 sec) select symbol, timeStamp, count(timeStamp) as eventCount group by symbol insert into outputStream. This is for SIddhi CEP, but I imagine Esper would be similar.


Solution

  • From any type of a window in WSO2 CEP, you can expect two types of events.

    1. Current events - these are triggered when a new event enters the window. i.e. the new event itself is used as a trigger
    2. Expired events - these are triggered when an existing event in the window exits it. i.e. in case of a time window of 1 minute, each event is kept for 1 minute and emitted at the end of 1 minute

    You can also use a combination of these two in the same query to trigger it from both types of events.

    An example query that uses both types of triggers in CEP 3.1.0 will be (check the docs here):

    from StockExchangeStream[symbol == 'WSO2']#window.time( 1 minute ) 
    select max(price) as maxPrice, avg(price) as avgPrice, min(price) as minPrice
    insert into WSO2StockQuote for all-events  
    

    if you want this to be triggered only using expired events, use 'expired-events' in place of 'all-events'. Same applies for current-events. If you don't specify anything, it defaults to current-events, that's why your current query doesn't get triggered for expired events.

    Note that for CEP 4.0.0 the syntax is a bit different, for correct syntax check the test source codes here (since the docs are still work-in-progress).