Search code examples
wso2complex-event-processingsiddhiwso2-streaming-integratorei

Publish statistic from wso2 EI to wso2 Stream Processor


I need to know how is it possible to publish statistics via event publisher from Enteprise Integrator to Stream Processor.

I have following implementation of event publisher on my EI

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="MessageFlowStatisticsPublisher"
  statistics="enable" trace="enable" xmlns="http://wso2.org/carbon/eventpublisher">
  <from streamName="org.wso2.esb.analytics.stream.FlowEntry" version="1.0.0"/>
  <mapping customMapping="disable" type="wso2event"/>
  <to eventAdapterType="wso2event">
    <property name="username">admin</property>
    <property name="protocol">thrift</property>
    <property name="publishingMode">non-blocking</property>
    <property name="publishTimeout">0</property>
    <property name="receiverURL">tcp://xxx:7611</property>
    <property encrypted="true" name="password">xxx</property>
  </to>
</eventPublisher>

On Stream Processor I have simple siddhi app for receive data and print them into log as is shown below

@App:name("FlowEntryApp")
@App:description("Plan of flow entry")

@source(type='wso2event', @map(type = 'wso2event'))
define stream FlowEntry(compressed bool, tenantId int, messageId string, flowData string);

@sink(type='log', prefix='My flowEntry:')
define stream TestOutputFlowEntry(messageId string, flowData string);

@info(name='FlowEntryOutput')
from FlowEntry
select messageId, flowData
group by messageId
insert into TestOutputFlowEntry;

Also I have setted all configuration for publishing statstics as "enable statistic" and "enable trace" for my proxy service. When I invoke my service, eventPublisher send wso2event to SP, this is working correctly. But on the SP side, SP handle error "No StreamDefinition for streamId org.wso2.esb.analytics.stream.FlowEntry:1.0.0 present in cache"

I know, that problem is in siddhi app, that I define stream "FlowEntry" instead of "org.wso2.esb.analytics.stream.FlowEntry" but siddhi language doesn't support characters like '.' in stream name.

So I tried to change stream name on EI site, change streamName in eventPublisher to 'FlowEntry' only, also I changed streamName in json file inside eventstream folder but now when I invoke my service, EI will not send any events to SP.

Have anybody idea how to publish org.wso2.esb.analytics.stream.FlowEntry stream to SP and then processed it by siddhi?


Solution

  • The stream name can be overridden by using wso2.stream.id element in the source annotation.

    @source(type='wso2event', wso2.stream.id='org.wso2.esb.analytics.stream.FlowEntry', @map(type = 'wso2event'))<br>
    define stream FlowEntry(compressed bool, tenantId int, messageId string, flowData string);
    

    By using the above source definition, 'FlowEntry' can still be used inside the Siddhi App, while in the thrift server stream id will be defined as 'org.wso2.esb.analytics.stream.FlowEntry:1.0.0'.

    Best Regards.