I'm looking for a way to group or window Esper events in a dynamic window, in a similar as to what Apache Flink call's Session Windows (see below)
I'd like to create a Contex per session, but so far have been unable to accomplish a way to capture sessions. Initial (not working) example;
CREATE SCHEMA EventX AS (sensorId string, timestamp long, value double);
CREATE SCHEMA SessionEvent AS (sensorId string, totalValue double, events EventX[]);
-- Unsure about Context definition
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx PARTITION BY sensorId FROM EventX,
CONTEXT sessionCtx INITITATED BY Eventx TERMINATED BY pattern [every EventX -> (timer:interval(3 sec) and not EventX)];
CONTEXT sensorSessionCtx
INSERT INTO SessionEvent
SELECT sensorId
, SUM(value) AS totalValue
, window(*) AS events
FROM EventX#keepall
@Name('Sessions') SELECT * FROM SessionEvent;
And some testdata for Esper EPL Online
EventX = {sensorId='A', timestamp=0, value=1.1}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=1000, value=3.2}
t=t.plus(1 seconds)
EventX = {sensorId='B', timestamp=2000, value=8.4}
t=t.plus(1 seconds)
EventX = {sensorId='A', timestamp=3000, value=2.7}
EventX = {sensorId='B', timestamp=3000, value=0.2}
t=t.plus(3 seconds)
EventX = {sensorId='A', timestamp=6000, value=3.1}
Expected output;
SessionEvent={sensorId='A', totalValue=7.0, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
How would I create (dynamic) session windows (or contexts) in Esper?
To provide a Session-context in Esper, we need to create a Nested Context.
In this nested context we Keyed Segment context is defined first, to 'split' to stream into separate user, or sensor as per the example, streams. We then define a second Non-Overlapping context. The second context will only operate within the first keyed context and (thus all events in this context have the same sensorId).
CREATE CONTEXT sensorSessionCtx
CONTEXT sensorCtx
, CONTEXT sessionCtx
END pattern [every (timer:interval(3 sec) AND NOT EventX)];
In the example, the output with this context definition would be;
At: 2001-01-01 08:00:06.000
SessionEvent={sensorId='A', totalValue=7.000000000000001, events={[EventX={sensorId='A', timestamp=0, value=1.1},EventX={sensorId='A', timestamp=1000, value=3.2},EventX={sensorId='A', timestamp=3000, value=2.7}]}}
SessionEvent={sensorId='B', totalValue=8.6, events={[EventX={sensorId='B', timestamp=2000, value=8.4},EventX={sensorId='B', timestamp=3000, value=0.2}]}}
At: 2001-01-01 08:00:09.000
SessionEvent={sensorId='A', totalValue=3.1, events={[EventX={sensorId='A', timestamp=6000, value=3.1}]}}
Although note that, when the internal engine timer has been disabled, for the last window to ever terminate, an event (any kind will do) should be received with a timestamp greater than the last session event timestamp + the session gap. In the EPL Online tool this can be accomplished by adding t=t.plus(10 seconds)
after the last defined input event. This is especially important should you want to create any kind of unit-test
val finalEventTimestamp = Long.MaxValue - 1 //Note: Long.MaxValue won't trigger final evaluation!
engine.getEPRuntime.sendEvent(new CurrentTimeEvent(finalEventTimestamp))