Search code examples
c#streaminsight

CEPStream delay and on event end


Suppose I have an interval event with a known end time, is there a way to fire off another event, before the said interval event expires. The purpose of this is that at that point I need to do an aggregate.

Alternatively is there a way to do a query (this is not as desired but I might have to resort to it)

(from input in inputStream
select input).AddDelay(input.EndTime - DateTime.UtcNow)

In case my understanding is completely off, please suggest a better approach.


Solution

  • First, we need to filter your source stream to get a stream of x1 events.

    var x1Stream = from e in sourceStream
                where e.ItemId == "X1"
                select new {e.ItemId, e.Timestamp}
    

    Next, we need to filter your source stream to get a stream of non-x1 events.

    var nonX1Stream = from e in sourceStream
                where e.ItemId != "X1"
                select e;
    

    Now we'll join the x1 event stream with the non-x1 event stream to get a list of all the non-x1 events that occur during an x1 event.

    var x = from l in x1Stream
        from r in nonX1Stream
        select new {l.ItemId, l.Timestamp, r};
    

    Getting a count of the non-x1 events that occur during an x1 event will require some kind of HoppingWindow to be able to actually count the events in the stream over a fixed period of time. You can also call ToEnumerable() to be able to do the grouping without a window.

    var y = from e in x.ToEnumerable()
        group e by new {e.ItemId, e.Timestamp}
        into g
        select new {g.Key.ItemId, g.Key.Timestamp, Count = g.Count()};