I have a source data from ActiveMQ, the problem that I have is that these data do not have a fixed structure, therefore, when I define the stream it throws an incompatible datatype error, is there any way to condition the source stream by some condition?
Thanks in advance.
/*
* Origin of data.
*/
@source(type='jms',
@map(type='csv', delimiter=',', fail.on.unknown.attribute='false'),
factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory',
provider.url='tcp://127.0.0.1:61616',
destination='simulatedData',
connection.factory.type='queue',
connection.factory.jndi.name='QueueConnectionFactory',
transport.jms.SubscriptionDurable='true',
transport.jms.DurableSubscriberClientID='wso2SPclient1')
define stream FileSourceProductionStream(type string, time long, studentId string, fileId string, totalAccesses float); /* totalAccesses : float Incompatible DataType*/
define stream TaskSourceProductionStream(type string, time long, studentId string, taskId string, deadline long); /*deadline: long Incompatible DataType*/
In siddhi, the source stream should have the schema of the input data. Therefore, we cannot have received schemaless data to a defined stream.
One possible solution for your scenario is defining a stream with all the possible input attributes and pre-format the input to a JSON[1], XML[2] or TEXT[3] formats which are supported by the siddhi-map extensions, with attribute names and values. For missing attributes, there won't be a key or a tag in the json/xml/text input payload.
Then use @map(fail.on.missing.attributes='false) configuration in the source config.
Then for all the missing attributes in the input payload, NULL will be assigned to the corresponding attribute of the input stream.
[1] https://wso2-extensions.github.io/siddhi-map-json/api/4.0.20/
[2] https://wso2-extensions.github.io/siddhi-map-xml/api/4.0.12/
[3] https://wso2-extensions.github.io/siddhi-map-text/api/1.0.16/