Search code examples
esper

Excessive memory usage in esper while aggregating values over 60 minutes using a context


I want to aggregate sensor values for each hour of the day. The time information comes from timestamps in the event stream. To that end, I've created four EPL statements:

  1. The first statement declares a context that partitions events by sensorId
  2. The second statement detects when the hour-of-day changes in the event stream of a particular sensor and sends an event when that occurs
  3. The third statement is a context declaration that partitions events by sensorId and hourOfDay. The lifetime of the context is controlled by the events published by the second statement.
  4. The last statement makes use of the context created in statement #3 and calculates a sum over the sensor values.

I'm receiving events from approximately 300 sensors every 2-6 seconds. I have esper 6.1.0 embedded in a java application with a heap size of 8 GB. After about 15 minutes, the memory pressure is such that garbage collection goes into overdrive, rendering the application unusable. If I remove the last EPL statement, my application behaves normally again.

I'm somewhat perplexed by this behavior. I thought that when a context is used, esper would not accumulate events in memory.

So here's my question: Using EPL, how do I perform a simple aggregation (e.g. sum, average) such that the memory consumption is O(n) rather than O(n*t), where n is the number of sensors and t is the length of the time window?

For concreteness, here are my EPL statements.

Statement 1:

create context ctxPartitionById partition by sensorId from SensorEvent

Statement 2:

context ctxPartitionById
INSERT INTO HourChanged
SELECT
  E.sensorId as sensorId,
  prev(1, E.occurredAtHour) AS lastHour,
  E.occurredAtHour AS currentHour
FROM SensorEvent#length(2) E
WHERE E.occurredAtHour != prev(1, E.occurredAtHour)

Statement 3:

create context ctxPartitionByIdAndHour
  context PartitionedByKeys
    partition by
      sensorId, currentHour from HourChanged,
      sensorId, occurredAtHour from SensorEvent,
  context InitiateAndTerm
    initiated by HourChanged as e1
    terminated by HourChanged(sensorId=e1.sensorId and lastHour=e1.currentHour)

Statement 4:

context ctxPartitionByIdAndHour
SELECT
  E.sensorId,
  E.occurredAtHour,
  SUM(E.value) AS sensorValueSum
FROM SensorEvent E
output last when terminated

sensorId is an instance of a java class that implements Comparable.

occurredAtHour is a java.util.Date instance. Its value is a timestamp that has been rounded down to the hour.

value is a double.


Solution

  • Adding @Hint('enable_outputlimit_opt') to the fourth and last EPL statement solves the issue.

    Apparently esper retains events in memory when output last is used, unless this particular hint is present. See 5.7.3. Runtime Considerations in the esper docs.