Search code examples
javajsonapache-edgent

Joining streams of different types in Apache Edgent


I have 3 streams:

TStream<Double> tempReadings=topology.poll(tempSensor, 10, TimeUnit.SECONDS);
TStream<Double> co2Readings=topology.poll(co2Sensor, 10, TimeUnit.SECONDS);
TStream<Boolean> stationaryReadings=topology.poll(stationarySensor, 10, TimeUnit.SECONDS);

I currently create 3 separate device events from 3 JSON objects:

TStream<JsonObject> tempJson=tempReadings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("Temperature", tuple);
    return json;
});
TStream<JsonObject> co2Json=co2Readings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("C02Level", tuple);
    return json;
});
TStream<JsonObject> sensoryJson=stationaryReadings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("isStationary", tuple);
    return json;
});

I instead would like to create a single event by joining these streams together and creating 1 JSON object with three properties (Temperature, C02Level and isStationary).


Solution

  • You can union the streams, but that would just put one tuple after another, and you need to use streams of the same type.

    If you want to read all 3 properties at once, you could create a sensor that returns a "readings" object:

    class Reading {
        Double temperature;
        Double c02Level;
        Boolean isStationary;
    }