Environment: WSO2 Stream Processor 4.3.0
Let's say I have two very simple streams:
Stream where newly created requests (unfulfilled) are being delivered in real time (t1)
RequestStream(requestId)
Stream where requestsIds appear when the request has been fulfilled in real time (t2)
FulfilmentStream(requestId)
It's guaranteed that t2 is always > t1
How can I implement a SiddhiQL statement to identify requestIds that appear at RequestStream (event1) and haven't appeard in FulfilmentStream (event2) after 5 minutes have been elapsed since event1?
Working Siddhi App based on Tishan answer:
@App:name('FailedToFulfillInAmountOfTime')
@source(
type="kafka",
topic.list="some_topic",
threading.option="single.thread",
group.id="some_group",
bootstrap.servers="xxx.xxx.xxx.xxx:6667",
@Map(type="json", @attributes(request_id = '$.alarm_id', severity = '$.severity', managed_object = '$.ManagedObject')))
define stream OrigAlarmStream (request_id int, severity string, managed_object string);
@sink(type='log', prefix='Got this execution request')
define stream RequestStream (request_id int, severity string, managed_object string);
@sink(type='log', prefix='Got this fulfillment confirmation:')
define stream FulfillmentStream (request_id int, severity string, managed_object string);
@sink(type='log', prefix='This fulfillment was not done within 1 min:')
define stream AlertStream(request_id int);
@info(name='getExpiredRequests')
from every e1=RequestStream -> not FulfillmentStream[e1.request_id == request_id] for 1 min
select e1.request_id
insert into AlertStream;
@info(name='CopyFulfillments')
from OrigAlarmStream[severity == 'Clear']
select request_id, severity, managed_object
insert into FulfillmentStream;
@info(name='CopyRequests')
from OrigAlarmStream[severity != 'Clear']
select request_id, severity, managed_object
insert into RequestStream;
You can use logical patterns to achieve your requirement. Please refer below query.
from e1=RequestStream -> not e2=FulfilmentStream[e1.requestId == e2.requestId] for '5 min'
select e1.requestId as requestId
insert into AlertStream;
Here we have defined a pattern with not condition. This will be triggered when an event in RequestStream comes and within 5 minutes no event comes into FulfilmentStream within 5 minutes. Please refer logical patterns for more information.