Search code examples
javaapache-edgent

what exactly is tuple in TStream?


Here is the code taken from Apache Edgent Documentation

I could not understand what tuple exactly is. The code is given below:

public static void main(String[] args) throws Exception {
    TempSensor sensor = new TempSensor();
    DirectProvider dp = new DirectProvider();
    Topology topology = dp.newTopology();

    TStream<Double> tempReadings = topology.poll(sensor, 1, TimeUnit.MILLISECONDS);
    TStream<Double> simpleFiltered = tempReadings.filter(tuple ->
            !optimalTempRangeRef.get().contains(tuple));
    simpleFiltered.sink(tuple -> System.out.println("Temperature is out of range! "
            + "It is " + tuple + "\u00b0F!"));

    tempReadings.print();

    dp.submit(topology);
}

The error that I get is tuple cannot be resolved to a variable. What exactly is the error that I am getting? Thank you.


Solution

  • The TStream<T> interface is designed to model a stream of data, typically sensor readings. The T in this case is the type used to store an individual reading, but 'reading' really can mean multiple numbers (eg temperature, humidity & windspeed) joined together into a single composite type, which generically is referred to here as a 'tuple' of values.

    However, looking at the context of your example, we're dealing with a stream of simple temperature readings, so here T corresponds to a single number of type Double. So the choice of 'tuple' as a variable name is a bit confusing (mathematically, it's a 1-tuple, but in this case that just means 'a number').

    In your code, the filter() method takes a predicate, which here is

    tuple -> !optimalTempRangeRef.get().contains(tuple)
    

    And optimalTempRangeRef.get() returns a Range(Double), so the predicate is saying "is our temperature value outside our optimal range?"

    From the docs for Range:

    contains() is used to check for containment: e.g.
    
     Ranges.closed(2,4).contains(2);    // returns true
     Ranges.open(2,4).contains(2);      // returns false
     Ranges.atLeast(2).contains(2);     // returns true
     Ranges.greaterThan(2).contains(2); // returns false
     Ranges.atMost(2).contains(2);      // returns true
     Ranges.lessThan(2).contains(2);    // returns false
    

    EDIT:

    Looks like your IDE is having trouble with the Java 8 lambda syntax, so you can re-write your code using anonymous inner classes, like this:

    import org.apache.edgent.function.Predicate;
    import org.apache.edgent.function.Consumer;
    
    
    public static void main( String[] args ) throws Exception
    {
        TempSensor sensor = new TempSensor();
        DirectProvider dp = new DirectProvider();
        Topology topology = dp.newTopology();
    
        TStream<Double> tempReadings = topology.poll( sensor, 1, TimeUnit.MILLISECONDS );
        TStream<Double> filteredStream = tempReadings.filter( new Predicate<Double>()
        {
            public boolean test( Double reading )
            {
                return !optimalTempRangeRef.get().contains( reading );
            }
        } );
    
        filteredStream.sink( new Consumer<Double>()
        {
            public void accept( Double reading )
            {
                System.out.println( "Temperature is out of range! "
                                    + "It is " + reading + "\u00b0F!" )
            }
        } );
    
        tempReadings.print();
    
        dp.submit( topology );
    }