Search code examples
wso2complex-event-processingsiddhiwso2-cep

WSO2 CEP - Siddhi queries correlate coordinates of different event streams


I'm new to WSO2 CEP and Siddhi queries. I have created three different event streams, each stream is about an particular sensor, as shown in the image below:

enter image description here

Each event stream of sensors has the latitude and longitude of the sensor. The sensors placed on different coordinates and I want to compare/correlate their coordinates (latitude and longitude). How can I compare their coordinates through Siddhi query (coordinates do not vary by more than 4 meters)? Thanks a lot!


Solution

  • Siddhi has a separate set of operations for geo operations. To calculate the distance between the sensors, you will be able to use geo distance function.

    Since you want to compare the sensor distance, you will have to keep the sensor locations in a window or in a table. Whenever a sensor location comes in the stream, you will be able to store it in the window or in the table, and join with the rest of the table contents and get the relevant output adheres to your requirement(within 4 meters)

    /* Enter a unique ExecutionPlan */
    @Plan:name('ExecutionPlan')
    
    /* Enter a unique description for ExecutionPlan */
    -- @Plan:description('ExecutionPlan')
    
    /* define streams/tables and write queries here ... */
    
    @Import('sensor4:1.0.0')
    define stream sensor4 (id int, lat double, long double);
    
    @Import('sensor3:1.0.0')
    define stream sensor3 (id int, lat double, long double);
    
    @Import('sensor2:1.0.0')
    define stream sensor2 (id int, lat double, long double);
    
    @Import('sensor1:1.0.0')
    define stream sensor1 (id int, lat double, long double);
    
    @Export('measuredStream:1.0.0')
    define stream measuredStream (sensor1Id int, sensor2Id int);
    
    define table sensorTable (id int, lat double, long double);
    
    from sensor1
    select *
    insert into sensorTable;
    
    from sensor2
    select *
    insert into sensorTable;
    
    from sensor1 join sensorTable
        on sensorTable.id != sensor1.id and 4 > geo:distance(sensorTable.lat, sensorTable.long, sensor1.lat, sensor1.long)
    select sensorTable.id as sensor1Id, sensor1.id as sensor2Id
    insert into measuredStream;
    
    from sensor2 join sensorTable
        on sensorTable.id != sensor2.id and 4 > geo:distance(sensorTable.lat, sensorTable.long, sensor2.lat, sensor2.long)
    select sensorTable.id as sensor1Id, sensor2.id as sensor2Id
    insert into measuredStream;
    

    The above execution plan will work, since you have after you create 2 more queries each to insert to the event table and join with event table. You can verify the results by adding a logger publisher(the events in the measured stream will be logged in the terminal) to the measured stream. And using event simulator, simulate the event flow.