Search code examples
siddhi

Siddhi data aggregation based on time


I am trying to aggregate sensor data based on time windows and write it to Cassandra once it has reached 30 seconds window (roll-up).

For example, a sensor named "temp" sends 3 readings for 30 seconds. I like to get the average value for this sensor for the last 30 seconds and write the avg value to Cassandra when window completes.

This is my code

BasicConfigurator.configure();

        
        // Create Siddhi Application
        String siddhiApp = "define stream SensorEventStream (sensorid string, value double); " +
                " " +
                "@info(name = 'query1') " +
                "from SensorEventStream#window.time(30 sec)  " +
                "select sensorid, avg(value) as value " +
                "group by sensorid " +
                "insert into AggregateSensorEventStream ;";

        // Creating Siddhi Manager
        SiddhiManager siddhiManager = new SiddhiManager();

        //Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        //Adding callback to retrieve output events from query
        siddhiAppRuntime.addCallback("AggregateSensorEventStream", new StreamCallback() {
             

            @Override
            public void receive(org.wso2.siddhi.core.event.Event[] events) {
                 EventPrinter.print(events);
            }
        });

        //Retrieving input handler to push events into Siddhi
        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("SensorEventStream");

        //Starting event processing
        siddhiAppRuntime.start();

        //Sending events to Siddhi
        inputHandler.send(new Object[]{"Temp", 26d});
        Thread.sleep(1000);
        inputHandler.send(new Object[]{"Temp", 25d});
        Thread.sleep(1000);
        inputHandler.send(new Object[]{"Temp", 24d});
        Thread.sleep(60000);
        inputHandler.send(new Object[]{"Temp", 23d});
         
        //Shutting down the runtime
        siddhiAppRuntime.shutdown();

        //Shutting down Siddhi
        siddhiManager.shutdown();

And the output is like this

0 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281656960, data=[Temp, 26.0], isExpired=false}]
1002 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281657971, data=[Temp, 25.5], isExpired=false}]
2003 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281658972, data=[Temp, 25.0], isExpired=false}]
62004 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281718972, data=[Temp, 23.0], isExpired=false}]

From this demo code I see it's sending the first avg of temp for 3 events immediately and after 30 seconds window, it does not do anything. then prints 23.

How can I get a notification when the window roll-up after 30 seconds? I thought that's what receive function does.

I am not sure whether I have misunderstood the functionality here. Is this possible with Siddhi at all?


Solution

  • This is the expected behaviour, The window is a sliding window. Here, when the first event comes, 1st second, the window holds only the first event so the average was 26. Then when the second event arrives, the window has both 26d as well as 25d, then the average in 25.5. Likewise, 3rd second average 25d. Then at 31, 32 and 33rd seconds these events would expire from the window. So when your 4th event comes(63rd second), there is only the latest event in the window, so average will be the value itself. This window calculates average as soon as an event arrives depending on the events received in the last 30 seconds before it.

    From your question, you seem to want timeBatch window. Here, the average is calculated only at the end of the batch. For instance, in this case, 30th, 60th, 90th second so on. Please see timeBatch doc for samples.