Search code examples
complex-event-processingsiddhiwso2-streaming-integrator

Is it possible to Group By a field extracted from JSON input in a Siddhi Query?


I currently have an stream, with 1 string attribute that contains a Json event.

This stream receives different events, which I want to apply Json path expressions so I can use those attributes on filters and functions.

JsonPath extractors work like a charm on filters and selectors, unfortunately, I am not being able to use them for the 'Group By' part. I am actually doing it in an embedded Siddhi App with siddhi-execution-json extension added manually, but for the discussion, so everybody can easily check and test it, I will paste an example app that works on WSO2 Stream Processor. The objective looks like the following App:

@App:name("Group_by_json_attribute")

define stream JsonStream(json string);

@sink(type='log')
define stream LogStream(myField string, count long);

@info(name='query1')
from JsonStream#window.time(10 sec)
select json:getString(json, '$.myField') as myField, count() as count 
group by myField having count > 1 
insert into LogStream;

and it can accept the following events:

{"myField": "my_value"}

However, this query will raise the error:

Cannot find attribute type as 'myField' does not exist in 'JsonStream'; define stream JsonStream(json string)

I have also tried to use directly the Json extractor at 'Group by':

group by json:getString(json, '$.myField') as myField having count > 1

However the error now is:

mismatched input ':' expecting {',', ORDER, LIMIT, OFFSET, HAVING, INSERT, DELETE, UPDATE, RETURN, OUTPUT}

which seems to not be expecting to use an extension here

I am just wondering, if it is possible to group by attributes not directly defined in the input stream. In this case is a field extracted from a JSON object, but it could be any other function that generates another attribute.

I am also using versions from maven central repository

  • Siddhi: io.siddhi:siddhi-core:5.0.1
  • siddhi-execution-json: io.siddhi.extension.execution.json:siddhi-execution-json:2.0.1

(Edit) Clarification

The objective is, to use attributes not directly defined in the Stream, to be used on the Group By.

The reason why is, I currently have an embedded app which defines the whole set of input streams coming from external sources formatted as JSON, and there are also a set of output streams to inform external components when a query matches. This app allows users to create custom queries on this set of predefined Streams, but they are not able to create Streams by their own.

Many thanks!


Solution

  • It seems we are expecting the group by fields from the query input stream, in this case, JsonStream. Use another query before this for extraction and the aggregation and filtering in the following query,

    @App:name("Group_by_json_attribute")
    
    define stream JsonStream(json string);
    
    @sink(type='log')
    define stream LogStream(myField string, count long);
    
    @info(name='extract_stream')
    from JsonStream
    select json:getString(json, '$.myField') as myField 
    insert into ExtractedStream;
    
    @info(name='query1')
    from ExtractedStream#window.time(10 sec)
    select myField, count() as count 
    group by myField 
    having count > 1 
    insert into LogStream;