Search code examples
streamwso2complex-event-processingsiddhiwso2-streaming-integrator

Siddhi stream schemaless data


I have a source data from ActiveMQ, the problem that I have is that these data do not have a fixed structure, therefore, when I define the stream it throws an incompatible datatype error, is there any way to condition the source stream by some condition?

Thanks in advance.

/*
* Origin of data.
*/
@source(type='jms',
        @map(type='csv', delimiter=',', fail.on.unknown.attribute='false'),
        factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory',
        provider.url='tcp://127.0.0.1:61616',
        destination='simulatedData',
        connection.factory.type='queue',
        connection.factory.jndi.name='QueueConnectionFactory',
        transport.jms.SubscriptionDurable='true',
        transport.jms.DurableSubscriberClientID='wso2SPclient1')


define stream FileSourceProductionStream(type string, time long, studentId string, fileId string, totalAccesses float); /* totalAccesses : float Incompatible DataType*/
define stream TaskSourceProductionStream(type string, time long, studentId string, taskId string, deadline long); /*deadline: long Incompatible DataType*/

Solution

  • In siddhi, the source stream should have the schema of the input data. Therefore, we cannot have received schemaless data to a defined stream.

    One possible solution for your scenario is defining a stream with all the possible input attributes and pre-format the input to a JSON[1], XML[2] or TEXT[3] formats which are supported by the siddhi-map extensions, with attribute names and values. For missing attributes, there won't be a key or a tag in the json/xml/text input payload.

    Then use @map(fail.on.missing.attributes='false) configuration in the source config.

    Then for all the missing attributes in the input payload, NULL will be assigned to the corresponding attribute of the input stream.

    [1] https://wso2-extensions.github.io/siddhi-map-json/api/4.0.20/

    [2] https://wso2-extensions.github.io/siddhi-map-xml/api/4.0.12/

    [3] https://wso2-extensions.github.io/siddhi-map-text/api/1.0.16/