Search code examples
esper

Custom function to calculate on all events in an Esper window


I am wondering what is the best or most idiomatic way to expose all events in a window to a custom function. The following example is constructed following the stock price style examples used in the Esper online documentation.


Suppose we have the following Esper query:

select avg(price), custom_function(price) from OrderEvent#unique(symbol)

The avg(price) part returns an average of the most recent price for each symbol. Suppose we want custom_function to work in a similar manner, but it needs complex logic - it would want to iterate over every value in the window each time the result is needed (eg outlier detection methods might need such an algorithm).

To be clear, I'm requiring the algorithm look something like:

custom_function(window):
   for each event in window:
       update calculation

and there is no clever way to update the calculation as events enter or leave the window.


A custom aggregation could achieve this by pushing and popping events to a set, but this becomes problematic when primitive types are used. It also feels wasteful, as presumably esper already has the collection of events in the window so we prefer not to duplicate that.

Esper docs mention many ways to customize things, see this Solution Pattern, for example. Also mentioned is that the 'pull API' can iterate all events in a window.


What approaches are considered best to solve this type of problem?


Solution

  • For access to all events at the same time use window(*) or prevwindow(*).

    select MyLibrary.computeSomething(window(*)) from ...
    

    The computeSomething is a public static method of class MyLibrary (or define a UDF).

    For access to individual event-at-a-time you could use an enumeration method. The aggregate-method has an initial value and accumulator lambda. There is an extension API to extend the existing enumeration methods that you could also use.

    select window(*).aggregate(..., (value, eventitem) => ...) from ...
    

    link to window(*) doc and link to enum method aggregate doc and link to enum method extension api doc