I would like to merge the incoming events into one event based on one of the fields.
Input Events:
{
ID: '123',
eventType: 'a',
eventCode: 1
},
{
ID: '123',
eventType: 'b',
eventCode: 2
},
{
ID: '123',
eventType: 'c',
eventCode: 3
}
Expected Output:
{
ID: '123',
events: [{
eventType: 'a',
eventCode: 1
},
{
eventType: 'b',
eventCode: 2
},
{
eventType: 'c',
eventCode: 3
}]
}
I am grouping the events based on a window of 4. So, I need to process the 4 events, merge them and pass it onto the next step.
Use Case: I would like to use the generated output to be stored in MongoDB OR pass it onto an external service.
Is this possible using Siddhi?
NOTE: I see that a similar question has already been asked, but the response is from 5 years ago, and Siddhi has come long way since then.
You can use below Siddhi apps to achieve your requirement. I have utilized string extension to do this. But please note generated output is exactly the one you requested. If you want a proper JSON output you might have to utilize execution json extention as well. Follow the readme for details on extension usage.
@App:name("testJsonConcat")
@App:description("Description of the plan")
-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor.
define stream inputStream(id string, eventType string, eventCode int);
partition with (id of inputStream)
begin
from inputStream
select id, str:concat("{eventType: '", eventType, "' , eventCode :",eventCode,"}") as jsonString
insert into #formattedStream;
from #formattedStream#window.lengthBatch(4)
select str:concat("{ ID : '", id, "',events: [", str:groupConcat(jsonString),"]}") as result
insert into concatStream;
end;
from concatStream#log()
select *
insert into temp;