Search code examples
apache-stormtrident

Storm Trident 'average aggregator


I am a newbie to Trident and I'm looking to create an 'Average' aggregator similar to 'Sum(), but for 'Average'.The following does not work:

       public class Average implements CombinerAggregator<Long>.......{

       public Long init(TridentTuple tuple)
       {
       (Long)tuple.getValue(0);
        }
        public Long Combine(long val1,long val2){
        return val1+val2/2;
        }
        public Long zero(){
        return 0L;
         }
       }

It may not be exactly syntactically correct, but that's the idea. Please help if you can. Given 2 tuples with values [2,4,1] and [2,2,5] and fields 'a','b' and 'c' and doing an average on field 'b' should return '3'. I'm not entirely sure how init() and zero() work.

Thank you so much for your help in advance.

Eli


Solution

  • I am a complete newbie when it comes to Trident as well, and so I'm not entirely if the following will work. But it might:

    public class AvgAgg extends BaseAggregator<AvgState> {
        static class AvgState {
            long count = 0;
            long total = 0;
    
            double getAverage() {
                return total/count;
            }
        }
    
        public AvgState init(Object batchId, TridentCollector collector) {
            return new AvgState();
        }
    
        public void aggregate(AvgState state, TridentTuple tuple, TridentCollector collector) {
            state.count++;
            state.total++;
        }
    
        public void complete(AvgState state, TridentCollector collector) {
            collector.emit(new Values(state.getAverage()));
        }
    }