Search code examples
wso2siddhi

Stream - Merge similar events data into one event


I would like to merge the incoming events into one event based on one of the fields.

Input Events:

{
  ID: '123',
  eventType: 'a',
  eventCode: 1
},
{
  ID: '123',
  eventType: 'b',
  eventCode: 2
},
{
  ID: '123',
  eventType: 'c',
  eventCode: 3
}

Expected Output:

{
  ID: '123',
  events: [{
    eventType: 'a',
    eventCode: 1
  },
  {
    eventType: 'b',
    eventCode: 2
  },
  {
    eventType: 'c',
    eventCode: 3
  }]
}

I am grouping the events based on a window of 4. So, I need to process the 4 events, merge them and pass it onto the next step.

Use Case: I would like to use the generated output to be stored in MongoDB OR pass it onto an external service.

Is this possible using Siddhi?

NOTE: I see that a similar question has already been asked, but the response is from 5 years ago, and Siddhi has come long way since then.


Solution

  • You can use below Siddhi apps to achieve your requirement. I have utilized string extension to do this. But please note generated output is exactly the one you requested. If you want a proper JSON output you might have to utilize execution json extention as well. Follow the readme for details on extension usage.

    @App:name("testJsonConcat")
    @App:description("Description of the plan")
    
    -- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor. 
    
    define stream inputStream(id string, eventType string, eventCode int);
    
    partition with (id of inputStream)
    begin
    from inputStream
    select id, str:concat("{eventType: '", eventType, "' , eventCode :",eventCode,"}") as jsonString
    insert into #formattedStream;
    
    from #formattedStream#window.lengthBatch(4)
    select str:concat("{ ID : '", id, "',events: [", str:groupConcat(jsonString),"]}") as result
    insert into concatStream;
    end;
    
    from concatStream#log()
    select *
    insert into temp;