Search code examples
wso2siddhiwso2-cep

wso2 cep Siddhiql merge data from 2 streams


I want to do statistics and the statistics are in a percentage.

I have a query that tells me the number of events coming from one stream that contains several values ['R','B','A','C']. And i want to count how many Bs there are in stream1 in percentage.

Number of events in stream1

@info(name='Query 1')
from stream1
select count() as numEvents
insert into event_num;

Number of events that are of type B

@info(name='Query 2')
from stream1[value == 'B']
select count() as numBs
insert into b_num;

Percentage count:

@info(name='Query 3')
from every e1=event_num,e2=b_num
select (e2.numBs*100)/e1.numEvents as bpercent
insert into b_percent;

if there is a value that is not B, then query 3 will not increment the numEvents and recalculate the percentage. Meaning that the Bs percentage will only be calculated if a B comes in. Which is not what i want. I want B percentage to decrease if a different value comes in. and increase if B comes in.

How can I make the Query3 in a way that whether event_num or whether v_num receives a new event calculates de percentage?


Solution

  • How about following execution plan:

    @IndexBy('dummyId')
    define table counterTable (dummyId int, numBs long, numAll long);
    
    define stream stream1 (value string);
    
    @info(name='counting bValues')
    from stream1[value == 'B']
    select 1 as dummyId, count() as numBs
    update counterTable
        on counterTable.dummyId == dummyId;
    
    @info(name='counting allValues')
    from stream1
    select 1 as dummyId, count() as numAll
    update counterTable
        on counterTable.dummyId == dummyId;
    
    @info(name='processing allValues')
    from stream1 JOIN counterTable
    select (counterTable.numBs * 100) / counterTable.numAll as percentage
    insert into statStream;
    

    Here,

    • for each and every event, numAll counter will be incremented.
    • if a B-valued event comes, then numB counter will be counted.

    • percentage calculation will be done for each and every event arrival (regardless of the value)