I have tried to use event tables in my execution plan and try to join table with input stream in WSO2 CEP v4.1.0 in distributed mode with Apache Storm.
Here is my Siddhi query.
@Plan:name('ExecutionPlan')
@Import('InputStream:1.0.0')
define stream InputStream (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);
@Export('outputStream:1.0.0')
define stream OutputStream (id string, param3 string);
@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')
define table cepTable (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);
@name('query1')
@dist(parallel='2', execGroup='Filtering')
partition with ( id of InputStream )
begin
from InputStream join cepTable
on cepTable.id == InputStream.id
select InputStream.id as id, InputStream.param3 as param3
insert into OutputStream;
end;
But it provide following exceptions
TID: [-1234] [] [2016-05-13 14:19:11,847] ERROR {org.wso2.carbon.event.processor.admin.EventProcessorAdminService} - Error while initialising the connection, null
org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException: Error while initialising the connection, null
at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:154)
at org.wso2.carbon.event.processor.core.EventProcessorDeployer.executeManualDeployment(EventProcessorDeployer.java:178)
at org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker.save(EventProcessorConfigurationFilesystemInvoker.java:95)
at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.editInactiveExecutionPlan(CarbonEventProcessorService.java:181)
at org.wso2.carbon.event.processor.admin.EventProcessorAdminService.editInactiveExecutionPlan(EventProcessorAdminService.java:108)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.axis2.rpc.receivers.RPCUtil.invokeServiceClass(RPCUtil.java:212)
at org.apache.axis2.rpc.receivers.RPCMessageReceiver.invokeBusinessLogic(RPCMessageReceiver.java:117)
at org.apache.axis2.receivers.AbstractInOutMessageReceiver.invokeBusinessLogic(AbstractInOutMessageReceiver.java:40)
at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:169)
at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:82)
at org.wso2.carbon.core.transports.local.CarbonLocalTransportSender.finalizeSendWithToAddress(CarbonLocalTransportSender.java:45)
at org.apache.axis2.transport.local.LocalTransportSender.invoke(LocalTransportSender.java:77)
at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
at org.apache.axis2.description.OutInAxisOperationClient.send(OutInAxisOperation.java:430)
at org.apache.axis2.description.OutInAxisOperationClient.executeImpl(OutInAxisOperation.java:225)
at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
at org.wso2.carbon.event.processor.stub.EventProcessorAdminServiceStub.editInactiveExecutionPlan(EventProcessorAdminServiceStub.java:2473)
at org.apache.jsp.eventprocessor.edit_005fexecution_005fplan_005fajaxprocessor_jsp._jspService(edit_005fexecution_005fplan_005fajaxprocessor_jsp.java:84)
at org.apache.jasper.runtime.HttpJspBase.service(HttpJspBase.java:70)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.apache.jasper.servlet.JspServletWrapper.service(JspServletWrapper.java:432)
at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:395)
at org.apache.jasper.servlet.JspServlet.service(JspServlet.java:339)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.wso2.carbon.ui.JspServlet.service(JspServlet.java:155)
at org.wso2.carbon.ui.TilesJspServlet.service(TilesJspServlet.java:80)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.eclipse.equinox.http.helper.ContextPathServletAdaptor.service(ContextPathServletAdaptor.java:37)
at org.eclipse.equinox.http.servlet.internal.ServletRegistration.service(ServletRegistration.java:61)
at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:128)
at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:68)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.wso2.carbon.ui.filters.CSRFPreventionFilter.doFilter(CSRFPreventionFilter.java:88)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.wso2.carbon.ui.filters.CRLFPreventionFilter.doFilter(CRLFPreventionFilter.java:59)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.wso2.carbon.tomcat.ext.filter.CharacterSetFilter.doFilter(CharacterSetFilter.java:61)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:504)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.wso2.carbon.tomcat.ext.valves.CompositeValve.continueInvocation(CompositeValve.java:99)
at org.wso2.carbon.tomcat.ext.valves.CarbonTomcatValve$1.invoke(CarbonTomcatValve.java:47)
at org.wso2.carbon.webapp.mgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:57)
at org.wso2.carbon.event.receiver.core.internal.tenantmgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:48)
at org.wso2.carbon.tomcat.ext.valves.TomcatValveContainer.invokeValves(TomcatValveContainer.java:47)
at org.wso2.carbon.tomcat.ext.valves.CompositeValve.invoke(CompositeValve.java:62)
at org.wso2.carbon.tomcat.ext.valves.CarbonStuckThreadDetectionValve.invoke(CarbonStuckThreadDetectionValve.java:159)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:57)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:421)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1074)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:611)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1739)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1698)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException: Error while initialising the connection, null
at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:79)
at org.wso2.siddhi.extension.eventtable.RDBMSEventTable.init(RDBMSEventTable.java:119)
at org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventTable(DefinitionParserHelper.java:99)
at org.wso2.siddhi.core.util.ExecutionPlanRuntimeBuilder.defineTable(ExecutionPlanRuntimeBuilder.java:74)
at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.defineTableDefinitions(ExecutionPlanParser.java:194)
at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.parse(ExecutionPlanParser.java:140)
at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:53)
at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:61)
at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.init(SiddhiBolt.java:104)
at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.<init>(SiddhiBolt.java:86)
at org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor.constructTopologyBuilder(StormTopologyConstructor.java:114)
at org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager.submitTopology(StormTopologyManager.java:127)
at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.addExecutionPlan(CarbonEventProcessorService.java:314)
at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:124)
... 76 more
Caused by: java.sql.SQLException
at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:254)
at org.apache.tomcat.jdbc.pool.PooledConnection.connect(PooledConnection.java:182)
at org.apache.tomcat.jdbc.pool.ConnectionPool.createConnection(ConnectionPool.java:701)
at org.apache.tomcat.jdbc.pool.ConnectionPool.borrowConnection(ConnectionPool.java:635)
at org.apache.tomcat.jdbc.pool.ConnectionPool.getConnection(ConnectionPool.java:188)
at org.apache.tomcat.jdbc.pool.DataSourceProxy.getConnection(DataSourceProxy.java:127)
at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:73)
... 89 more
Caused by: java.lang.NullPointerException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:246)
... 95 more
What is the reason for it?
When the execution plan is deployed on WSO2CEP server, having following annotations would be enough,
@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')
given that we have defined a datasource called 'MYSQL'
among the datasource configurations in the WSO2CEP server.
However, when the exceution plan is deployed on Storm, datasource configuration has to be specified inline (because datasource configurations defined in the WSO2CEP server are not available within Storm). This can be done as below:
@From(eventtable = 'rdbms' , jdbc.url='', username='', password='', driver.name='' , table.name = 'cep')
Fill in the annotation elements appropriately.
Hope this will resolve your issue.