Search code examples
apache-stormtrident

How to get previous state before counter update


I have for example these batches of tuples with size of batch 5 with impressions from users:

Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]

Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]

And this is my example of saving count state:

TridentState ClientState = impressionStream
    .groupBy(new Fields("clientId"))
    .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
        "UserImpressionCounter"), new Count(), new Fields("count));

Stream ClientStream = ClientState.newValuesStream();

I have clear database and run my topology. After grouping stream by clientId I save the state with persistentAggregate function and Count aggregator. For the first batch is the result after newValuesStream method: [clientId1, 4], [clientId2, 1]. For the second batch: [clientId1, 7], [clientId2, 3] as expected.

ClientStream is used in couple of branches and in one of these branches I need to process tuples so as to have batch with size 1 because I need information about count for each tuple. Batch with size 1 is obviously crap so I have to somehow find out the previous state of the counter before I update it and emit this information with tuple there is already updated counter, e.g. for second batch [clientId1, 7, 4].

Have anybody idea how to do that?


Solution

  • I have solved this issue by adding new aggregator and join with persist aggregate:

    TridentState ClientState = impressionStream
        .groupBy(new Fields("clientId"))
        .persistentAggregate(getCassandraStateFactory("users", "DataComputation",
            "UserImpressionCounter"), new Count(), new Fields("count));
    
    Stream ClientBatchAggregationStream = impressionStream
        .groupBy(new Fields("clientId"))
        .aggregate(new SumCountAggregator(), new Fields("batchCount"));
    
    Stream GroupingPeriodCounterStateStream = topology
        .join(ClientState.newValuesStream(), new Fields("clientId"),
            ClientBatchAggregationStream, new Fields("clientId"), 
            new Fields("clientId", "count", "batchCount"));
    

    SumCountAggregator:

    public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> {
    
        static class CountState {
            long count = 0;
        }
    
        @Override
        public CountState init(Object batchId, TridentCollector collector) {
            return new CountState();
        }
    
        @Override
        public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector)            {
            state.count += 1;
        }
    
        @Override
        public void complete(CountState state, TridentCollector collector) {
            collector.emit(new Values(state.count));
        }
    
    }