I'm looking for a way to group or window Esper events in a dynamic window, in a similar as to what Apache Flink call's Session Windows (see below)
I'd like to create a Contex per session, but so far have been unable to accomplish a way to capture sessions. Initial (not working) example;
CREATE SCHEMA EventX AS (sensorId string, timestamp long, value double);
CREATE SCHEMA SessionEvent AS (sensorId string, totalValue double, events EventX[]);
-- Unsure about Context definition
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx PARTITION BY sensorId FROM EventX,
CONTEXT sessionCtx INITITATED BY Eventx TERMINATED BY pattern [every EventX -> (timer:interval(3 sec) and not EventX)];
CONTEXT sensorSessionCtx
INSERT INTO SessionEvent
SELECT sensorId
, SUM(value) AS totalValue
, window(*) AS events
FROM EventX#keepall
OUTPUT LAST WHEN TERMNATED;
@Name('Sessions') SELECT * FROM SessionEvent;
And some testdata for Esper EPL Online
EventX = {sensorId='A', timestamp=0, value=1.1}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=1000, value=3.2}
t=t.plus(1 seconds)
EventX = {sensorId='B', timestamp=2000, value=8.4}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=3000, value=2.7}
EventX = {sensorId='B', timestamp=3000, value=0.2}
t=t.plus(3 seconds)
EventX = {sensorId='A', timestamp=6000, value=3.1}
Expected output;
SessionEvent={sensorId='A', totalValue=7.0, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
How would I create (dynamic) session windows (or contexts) in Esper?
To provide a Session-context in Esper, we need to create a Nested Context.
In this nested context we Keyed Segment context is defined first, to 'split' to stream into separate user, or sensor as per the example, streams. We then define a second Non-Overlapping context. The second context will only operate within the first keyed context and (thus all events in this context have the same sensorId).
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx
PARTITION BY sensorId FROM EventX
, CONTEXT sessionCtx
START EventX
END pattern [every (timer:interval(3 sec) AND NOT EventX)];
In the example, the output with this context definition would be;
At: 2001-01-01 08:00:06.000
Insert
SessionEvent={sensorId='A', totalValue=7.000000000000001, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
Insert
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
At: 2001-01-01 08:00:09.000
Insert
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
Although note that, when the internal engine timer has been disabled, for the last window to ever terminate, an event (any kind will do) should be received with a timestamp greater than the last session event timestamp + the session gap. In the EPL Online tool this can be accomplished by adding t=t.plus(10 seconds)
after the last defined input event. This is especially important should you want to create any kind of unit-test
val finalEventTimestamp = Long.MaxValue - 1 //Note: Long.MaxValue won't trigger final evaluation!
engine.getEPRuntime.sendEvent(new CurrentTimeEvent(finalEventTimestamp))