Search code examples
wso2apache-stormwso2-cep

Siddhi Event Tables are not properly working with Apache storm


I have tried to use event tables in my execution plan and try to join table with input stream in WSO2 CEP v4.1.0 in distributed mode with Apache Storm.

Here is my Siddhi query.

@Plan:name('ExecutionPlan')

@Import('InputStream:1.0.0')
define stream InputStream (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);

@Export('outputStream:1.0.0')
define stream OutputStream (id string, param3 string);

@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep') 
define table cepTable (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);

@name('query1') 
@dist(parallel='2', execGroup='Filtering')
partition with ( id of InputStream )
begin
  from InputStream join cepTable 
  on cepTable.id == InputStream.id 
  select InputStream.id as id, InputStream.param3 as param3  
  insert into OutputStream;
end;

But it provide following exceptions

TID: [-1234] [] [2016-05-13 14:19:11,847] ERROR {org.wso2.carbon.event.processor.admin.EventProcessorAdminService} -  Error while initialising the connection, null 
org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException: Error while initialising the connection, null
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:154)
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.executeManualDeployment(EventProcessorDeployer.java:178)
    at org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker.save(EventProcessorConfigurationFilesystemInvoker.java:95)
    at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.editInactiveExecutionPlan(CarbonEventProcessorService.java:181)
    at org.wso2.carbon.event.processor.admin.EventProcessorAdminService.editInactiveExecutionPlan(EventProcessorAdminService.java:108)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.axis2.rpc.receivers.RPCUtil.invokeServiceClass(RPCUtil.java:212)
    at org.apache.axis2.rpc.receivers.RPCMessageReceiver.invokeBusinessLogic(RPCMessageReceiver.java:117)
    at org.apache.axis2.receivers.AbstractInOutMessageReceiver.invokeBusinessLogic(AbstractInOutMessageReceiver.java:40)
    at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:169)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:82)
    at org.wso2.carbon.core.transports.local.CarbonLocalTransportSender.finalizeSendWithToAddress(CarbonLocalTransportSender.java:45)
    at org.apache.axis2.transport.local.LocalTransportSender.invoke(LocalTransportSender.java:77)
    at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
    at org.apache.axis2.description.OutInAxisOperationClient.send(OutInAxisOperation.java:430)
    at org.apache.axis2.description.OutInAxisOperationClient.executeImpl(OutInAxisOperation.java:225)
    at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
    at org.wso2.carbon.event.processor.stub.EventProcessorAdminServiceStub.editInactiveExecutionPlan(EventProcessorAdminServiceStub.java:2473)
    at org.apache.jsp.eventprocessor.edit_005fexecution_005fplan_005fajaxprocessor_jsp._jspService(edit_005fexecution_005fplan_005fajaxprocessor_jsp.java:84)
    at org.apache.jasper.runtime.HttpJspBase.service(HttpJspBase.java:70)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.apache.jasper.servlet.JspServletWrapper.service(JspServletWrapper.java:432)
    at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:395)
    at org.apache.jasper.servlet.JspServlet.service(JspServlet.java:339)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.wso2.carbon.ui.JspServlet.service(JspServlet.java:155)
    at org.wso2.carbon.ui.TilesJspServlet.service(TilesJspServlet.java:80)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.eclipse.equinox.http.helper.ContextPathServletAdaptor.service(ContextPathServletAdaptor.java:37)
    at org.eclipse.equinox.http.servlet.internal.ServletRegistration.service(ServletRegistration.java:61)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:128)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:68)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CSRFPreventionFilter.doFilter(CSRFPreventionFilter.java:88)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CRLFPreventionFilter.doFilter(CRLFPreventionFilter.java:59)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.tomcat.ext.filter.CharacterSetFilter.doFilter(CharacterSetFilter.java:61)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:504)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.continueInvocation(CompositeValve.java:99)
    at org.wso2.carbon.tomcat.ext.valves.CarbonTomcatValve$1.invoke(CarbonTomcatValve.java:47)
    at org.wso2.carbon.webapp.mgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:57)
    at org.wso2.carbon.event.receiver.core.internal.tenantmgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:48)
    at org.wso2.carbon.tomcat.ext.valves.TomcatValveContainer.invokeValves(TomcatValveContainer.java:47)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.invoke(CompositeValve.java:62)
    at org.wso2.carbon.tomcat.ext.valves.CarbonStuckThreadDetectionValve.invoke(CarbonStuckThreadDetectionValve.java:159)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
    at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:57)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:421)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1074)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:611)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1739)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1698)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException: Error while initialising the connection, null
    at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:79)
    at org.wso2.siddhi.extension.eventtable.RDBMSEventTable.init(RDBMSEventTable.java:119)
    at org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventTable(DefinitionParserHelper.java:99)
    at org.wso2.siddhi.core.util.ExecutionPlanRuntimeBuilder.defineTable(ExecutionPlanRuntimeBuilder.java:74)
    at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.defineTableDefinitions(ExecutionPlanParser.java:194)
    at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.parse(ExecutionPlanParser.java:140)
    at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:53)
    at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:61)
    at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.init(SiddhiBolt.java:104)
    at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.<init>(SiddhiBolt.java:86)
    at org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor.constructTopologyBuilder(StormTopologyConstructor.java:114)
    at org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager.submitTopology(StormTopologyManager.java:127)
    at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.addExecutionPlan(CarbonEventProcessorService.java:314)
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:124)
    ... 76 more
Caused by: java.sql.SQLException
    at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:254)
    at org.apache.tomcat.jdbc.pool.PooledConnection.connect(PooledConnection.java:182)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.createConnection(ConnectionPool.java:701)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.borrowConnection(ConnectionPool.java:635)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.getConnection(ConnectionPool.java:188)
    at org.apache.tomcat.jdbc.pool.DataSourceProxy.getConnection(DataSourceProxy.java:127)
    at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:73)
    ... 89 more
Caused by: java.lang.NullPointerException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:246)
    ... 95 more

What is the reason for it?


Solution

  • When the execution plan is deployed on WSO2CEP server, having following annotations would be enough,

    @From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')
    

    given that we have defined a datasource called 'MYSQL' among the datasource configurations in the WSO2CEP server.

    However, when the exceution plan is deployed on Storm, datasource configuration has to be specified inline (because datasource configurations defined in the WSO2CEP server are not available within Storm). This can be done as below:

    @From(eventtable = 'rdbms' , jdbc.url='', username='', password='', driver.name='' , table.name = 'cep')
    

    Fill in the annotation elements appropriately.

    Hope this will resolve your issue.