Is there any way to update the MySQL Db without updating the empty values in the stream. If my input data stream contains some empty values, currently that empty value is indicate using "data_empty" value. At that time CEP update the DB with that value ("data_empty"). My goal is without updating that empty value update the rest of things. Is it possible to do using siddhi and WSO2 CEP.
@Plan:name('DBUpdateExecutionPlan')
@Import('testStream:1.0.0')
define stream input (id string, param1 string, param2 string);
@Export('testOutStream:1.0.0')
define stream output (id string, param1 string, param2 string);
@from(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')
define table cepTable (id string, param1 string, param2 string) ;
from input#window.time(0 sec)
select *
update cepTable on id == cepTable.id;
It's sort of difficult to update only the non-empty (not 'data_empty' in your scenario) values to the database. However in Siddhi, there's a function called ifThenElse(condition, value if true, value if false)
, which can be used in your scenario. Refer to the below sample execution plan for you to get an idea on using ifThenElse()
and table updating (similar to your usecase).
@Plan:name('IfThenElseExecutionPlan')
@Import('inputStream:1.0.0')
define stream dataIn (roomId int, roomType string, roomTemp float);
@Export('outputStream:1.0.0')
define stream dataOut (roomId int, roomType string, roomTemp float);
@From(eventtable='rdbms', datasource.name='cepdatabase', table.name='roomTable')
define table roomTable (roomId int, roomType string, roomTemp float);
from dataIn[not((roomTable.roomId == roomId) in roomTable)]
insert into updateStream;
from dataIn join roomTable
on roomTable.roomId == dataIn.roomId
select dataIn.roomId as roomId,
ifThenElse(dataIn.roomType=='data_empty', roomTable.roomType, dataIn.roomType) as roomType,
ifThenElse(dataIn.roomTemp==0.0f, roomTable.roomTemp, dataIn.roomTemp) as roomTemp
insert into updateStream;
from updateStream
insert overwrite roomTable
on roomTable.roomId == roomId;