Search code examples
wso2wso2-daswso2-streaming-integratorwso2-cep

Multiple Sources in WSO2 Stream Processor


I'm trying to configure one Siddhi application on WSO2 Stream Processor with two sources (both are files) but that doesn't work (one source works fine). However, even when I split the application in to Siddhi application that doesn't work too. There logs in both situations are the same as below:

[2018-01-25 08:51:20,583]  INFO {org.quartz.impl.StdSchedulerFactory} - Using default implementation for ThreadExecutor
[2018-01-25 08:51:20,586]  INFO {org.quartz.simpl.SimpleThreadPool} - Job execution threads will use class loader of thread: Timer-0
[2018-01-25 08:51:20,599]  INFO {org.quartz.core.SchedulerSignalerImpl} - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
[2018-01-25 08:51:20,599]  INFO {org.quartz.core.QuartzScheduler} - Quartz Scheduler v.2.3.0 created.
[2018-01-25 08:51:20,600]  INFO {org.quartz.simpl.RAMJobStore} - RAMJobStore initialized.
[2018-01-25 08:51:20,601]  INFO {org.quartz.core.QuartzScheduler} - Scheduler meta-data: Quartz Scheduler (v2.3.0) 'polling-task-runner' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 1 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

[2018-01-25 08:51:20,601]  INFO {org.quartz.impl.StdSchedulerFactory} - Quartz scheduler 'polling-task-runner' initialized from an externally provided properties instance.
[2018-01-25 08:51:20,601]  INFO {org.quartz.impl.StdSchedulerFactory} - Quartz scheduler version: 2.3.0
[2018-01-25 08:51:20,601]  INFO {org.quartz.core.QuartzScheduler} - Scheduler polling-task-runner_$_NON_CLUSTERED started.
[2018-01-25 08:51:20,604]  INFO {org.quartz.core.QuartzScheduler} - Scheduler polling-task-runner_$_NON_CLUSTERED started.
[2018-01-25 08:51:20,605] ERROR {org.wso2.carbon.connector.framework.server.polling.PollingTaskRunner} - Exception occurred while scheduling job org.quartz.ObjectAlreadyExistsException: Unable to store Trigger with name: 'scheduledPoll' and group: 'group1', because one already exists with this identification.
    at org.quartz.simpl.RAMJobStore.storeTrigger(RAMJobStore.java:415)
    at org.quartz.simpl.RAMJobStore.storeJobAndTrigger(RAMJobStore.java:252)
    at org.quartz.core.QuartzScheduler.scheduleJob(QuartzScheduler.java:855)
    at org.quartz.impl.StdScheduler.scheduleJob(StdScheduler.java:249)
    at org.wso2.carbon.connector.framework.server.polling.PollingTaskRunner.start(PollingTaskRunner.java:74)
    at org.wso2.carbon.connector.framework.server.polling.PollingServerConnector.start(PollingServerConnector.java:57)
    at org.wso2.carbon.transport.remotefilesystem.server.connector.contractimpl.RemoteFileSystemServerConnectorImpl.start(RemoteFileSystemServerConnectorImpl.java:75)
    at org.wso2.extension.siddhi.io.file.FileSource.deployServers(FileSource.java:537)
    at org.wso2.extension.siddhi.io.file.FileSource.connect(FileSource.java:370)
    at org.wso2.siddhi.core.stream.input.source.Source.connectWithRetry(Source.java:130)
    at org.wso2.siddhi.core.SiddhiAppRuntime.start(SiddhiAppRuntime.java:335)
    at org.wso2.carbon.stream.processor.core.internal.StreamProcessorService.deploySiddhiApp(StreamProcessorService.java:280)
    at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploySiddhiQLFile(StreamProcessorDeployer.java:81)
    at org.wso2.carbon.stream.processor.core.internal.StreamProcessorDeployer.deploy(StreamProcessorDeployer.java:170)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.lambda$deployArtifacts$0(DeploymentEngine.java:291)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.deployArtifacts(DeploymentEngine.java:282)
    at org.wso2.carbon.deployment.engine.internal.RepositoryScanner.sweep(RepositoryScanner.java:112)
    at org.wso2.carbon.deployment.engine.internal.RepositoryScanner.scan(RepositoryScanner.java:68)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngine.start(DeploymentEngine.java:121)
    at org.wso2.carbon.deployment.engine.internal.DeploymentEngineListenerComponent.onAllRequiredCapabilitiesAvailable(DeploymentEngineListenerComponent.java:216)
    at org.wso2.carbon.kernel.internal.startupresolver.StartupComponentManager.lambda$notifySatisfiableComponents$7(StartupComponentManager.java:266)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at org.wso2.carbon.kernel.internal.startupresolver.StartupComponentManager.notifySatisfiableComponents(StartupComponentManager.java:252)
    at org.wso2.carbon.kernel.internal.startupresolver.StartupOrderResolver$1.run(StartupOrderResolver.java:204)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)

Anybody can throw an idea on how to overcome this?

Thanks.


Solution

  • Thanks for pointing this issue out. Seems like this is occurred due to scheduling two polling tasks with the same id. I have created an issue for this in git repository[1]. The fix will be shipped with an update soon.

    [1] https://github.com/wso2/product-sp/issues/463

    Best Regards!