Search code examples
influxdbtelegrafkapacitor

Kapacitor task with multiple percentiles


I want to aggregate data from last minute from telegraf with kapacitor before putting them into influxdb and I also have this need for calculating few percentiles. And so I wrote a simple tick for test

var firstPerc = stream
    |from()
        .measurement('my_tmp_measurement_from_telegraf')
var secondPerc = stream
    |from()
        .measurement('my_tmp_measurement_from_telegraf')
firstPerc
    |join(secondPerc)
        .as('fp', 'sp')
        |percentile('fp.myAggVal', 50.0)
        |eval(lambda: "percentile")
            .as('50p')
        |percentile('sp.myAggVal', 90.0)
        |eval(lambda: "percentile")
            .as('90p')
        |window()
            .period(60s)
            .every(60s)
            .align()
        |influxDBOut()
            .database('myDBInInflux')
            .retentionPolicy('autogen')

In my database, I have only values for 50th percentile, and I am not suprised with that since I use "percentile" in my eval but still, I cannot find in Kapacitor documentation any clue about how to get result I need.

Here you have "visual" result I crave for:

time 50p 90p someOtherP's otherDataICanPropablyHandle

Halp!


Solution

  • You are using the same measurement stream (and the same data in it) twice, so data are popped. First you should save the measurement stream:

    var myStream = stream
        |from()
            .measurement('my_tmp_measurement_from_telegraf')
    

    Next define streams using saved measurement. You should define here proper grouping, evaluations, etc.:

    var firstPerc = myStream
        |percentile('myAggVal', 50.0)
        |eval(lambda: "percentile")
            .as('percentile')
        |window()
            .period(60s)
            .every(60s)
            .align()
    var secondPerc = myStream
        |percentile('myAggVal', 90.0)
        |eval(lambda: "percentile")
            .as('percentile')
        |window()
            .period(60s)
            .every(60s)
            .align()
    

    Finaly, it's time to define join stream:

    var joinedStreams = firstPerc
        |join(secondPerc)
            .as('50', '90')
            .tolerance(1s)
            .streamName('measurementName')
            |influxDBOut()
                .database('myDBInInflux')
                .retentionPolicy('autogen')
                .create()
    

    The output:

    time                50.percentile 90.percentile
    

    I strongly suggest using .tolerance(), which will group measurements within the same tolerance period.