Search code examples
wso2siddhiwso2-streaming-integrator

WSO2 Stream Processor: unable to parse json messages


I am using stream processor 4.3.0 I have created one siddhi app which has source as mqtt and message type as json

and in sink as well I am using mqtt and message as json. Basically, there is no transformation of message required.

in source mqtt topic messages are in following way

{
"value1" : 59.698437,
"value2" : 14.977777,
"valid" : true
}

which ideally should be sent to the sink mqtt broker topic.

Now, to test this, I am using event simulator in /editor to test the sidhi app. After entering the dummy values, this generates feed as

{
"event" : {
    "value1" : "59.698437",
    "value2" : "14.977777",
    "valid" : true
}

which is successfully transferred to the sink topic.

Now, in the actual message feed and generated by simulator has difference. It has event object in message which is why the editor understand that and make other messages (which does not has event object) as invalid. Is there any way, that stream processor can process the feeds without events as well and how can this be checked that sinked messages are not having event?


Solution

  • You have to use custom mapping in JSON mapper type to parse the desired JSON input

    @source(type='mqtt', 
    @map(type='json', enclosing.element="$", @attributes(value1 = "value1", value2 = "value2", isValid = "valid")))
    define stream InputStream(value1 string, value2 string, isValid bool);
    

    See examples under API docs for more info, https://siddhi-io.github.io/siddhi-map-json/api/4.1.1/#json-source-mapper

    You can use log sink type to check the published event. Make sure to use the same map configs,

    @sink(type='log', 
    @map(type='json', enclosing.element="$", @attributes(value1 = "value1", value2 = "value2", isValid = "valid")))