Search code examples
wso2siddhievent-streamwso2-cep

WSO2 : Updating an output stream in Siddhi Query


I have a siddhi query to get the count of total events in one minute by using a timebatch window. Using an output stream I am updating a bar chart with constant coming values, timestamps(date and time,till minute ) on the x-axis and events count on the y-axis.

But there are sometimes when either the number of events in one minute take too long to be transmitted and hence query does not give correct results.

For instance if I get total 60 events and this query first gives me the count of 40 which displayed in bar chart but then after a minute it changes its value to 20 which is correct according to the logic but I am concerned if there is a way I could update the stream as well as bar chart for any previous timestamps (in that case 40+20) and insert into it new values for the next upcoming timestamps.

I have seen update function is used with tables not stream, is it so? And also I want 2 outputStreams populating two different bar charts from the same input stream. So is below query correct for that purpose?

Query is:

/* Enter a unique ExecutionPlan */
@Plan:name('FTPExecutionPlan')

/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')

/* define streams/tables and write queries here ... */

@Import('FTPInStream:1.0.0')
define stream FTPInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, user string, password string,command string, arg string, mime_type string, file_size string, reply_code int, reply_msg string);

@Export('FTPIPOutStream:1.0.0')
define stream FTPIPOutStream (ip_address string, ftp_requests int);

@Export('FTPOutStream:1.0.0')
define stream FTPOutStream (ts string, ftp_requests int);

from FTPInStream
select time:dateFormat(str:replaceAll(ts,'T',' '),'yyyy-MM-dd HH:mm', 'yyyy-MM-dd HH:mm:ss') as ts, uid, id_orig_h, id_orig_p, id_resp_h, id_resp_p
insert into intermediateStream;

from intermediateStream#window.timeBatch(1 min)
select ts, cast(count(ts), 'int') as ftp_requests
group by ts
insert into FTPOutStream;

from intermediateStream#window.timeBatch(1 min)
select id_orig_h as ip_address, cast(count(id_orig_h), 'int') as ftp_requests
group by id_orig_h
insert into FTPIPOutStream;

Solution

  • But there are sometimes when either the number of events in one minute take too long to be transmitted and hence query does not give correct results.

    In above situation, you should probably be using externalTimeBatch window, instead of timeBatch window. Then it'll process event batch relative to the event's time stamp instead of actual time.

    I have seen update function is used with tables not stream, is it so?

    Yes. only event tables have delete/update functionality. Event streams do not have that functionality.

    And also I want 2 outputStreams populating two different bar charts from the same input stream. So is below query correct for that purpose?

    Yes. Since you are exporting both FTPOutStream and FTPIPOutStream, you can use them to populate two different bar charts.