Search code examples
infosphere-splibm-infosphereibm-streams

Is there a way to capure tuple/sec through an operator in IBM Streams (not through Streams console)


I want to capture the number of tuples/sec through an operator and log it in a file. I can't use 'Throttle Operator' to set the tuple rate by myself. Also, to add again, I am not talking about capturing the information through console, but through SPL application.


Solution

  • There is no direct "give me the throughput for this operator" metric available. You could implement a primitive operator that accesses the nTuplesProcessed metric over time and calculates the throughput from that. (The list of available metrics.) But, I actually find it much easier to use the following composite operator:

    public composite PeriodicThroughputSink(input In) {
    param expression<float64> $period;
          expression<rstring> $file;
    graph
        stream<boolean b> Period = Beacon() {
            param period: $period;
        }
    
        stream<float64 throughput> Throughput = Custom(In; Period) {
            logic state: {
                mutable uint64 _count = 0;
                float64 _period = $period;
            }
    
            onTuple In: {
                ++_count;
            }
    
            onTuple Period: {
                if (_count > 0ul) {
                    submit({throughput=((float64)_count / _period)}, Throughput);
                    _count = 0ul;
                }
            }
    
            config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file
                                                          // writing is on a different thread from the rest 
                                                          // of the application
        }
    
        () as Sink = FileSink(Throughput) {
            param file: $file;
                  format: txt;
                  flush: 1u;
        }
    }
    

    You can then use the composite operator as a "throughput tap", where it consumes the stream from whatever operator whose throughput you want to record. For example, you may use it like so:

    stream<Data> Result = OperatorYouCareAbout(In) {}
    
    () as ResultThroughput = PeriodicThroughputSink(Result) {
        param period: 5.0;
              file: "ResultThroughput.txt";
    } 
    

    Of course, you can then still use the Result stream elsewhere in your application. Keep in mind that this method may have some impact on the performance of the application: we're putting a tap on the data path. But, the impact should not be large, particularly if you make sure that the operators in the PeriodicThroughputSink are fused into the same PE as whatever operator you're tapping. Also, the shorter the period, the more likely it will impact application performance.

    Again, we could do something similar in a C++ or Java primitive operator by accessing the nTuplesProcessed metric, but I find the above approach much easier. You could also grab the system metrics from outside of your application; say, you could have a script that periodically uses streamtool capturestate or the REST API, and then parse the output, find the nTuplesProcessed metric for the operator you care about and use that to calculate throughput. But I find the technique in this composite operator much easier.