I have for example these batches of tuples with size of batch 5 with impressions from users:
Batch 1:
[UUID1, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID2, clientId1]
[UUID3, clientId2]
Batch 2:
[UUID4, clientId1]
[UUID5, clientId1]
[UUID5, clientId1]
[UUID6, clientId2]
[UUID6, clientId2]
And this is my example of saving count state:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientStream = ClientState.newValuesStream();
I have clear database and run my topology. After grouping stream by clientId I save the state with persistentAggregate function and Count aggregator.
For the first batch is the result after newValuesStream method: [clientId1, 4]
, [clientId2, 1]
.
For the second batch: [clientId1, 7]
, [clientId2, 3]
as expected.
ClientStream is used in couple of branches and in one
of these branches I need to process tuples so as to have batch with size 1 because I need information about count for each
tuple.
Batch with size 1 is obviously crap so I have to somehow find out the previous state of the counter before I update it and emit
this information with tuple there is already updated counter, e.g. for second batch [clientId1, 7, 4]
.
Have anybody idea how to do that?
I have solved this issue by adding new aggregator and join with persist aggregate:
TridentState ClientState = impressionStream
.groupBy(new Fields("clientId"))
.persistentAggregate(getCassandraStateFactory("users", "DataComputation",
"UserImpressionCounter"), new Count(), new Fields("count));
Stream ClientBatchAggregationStream = impressionStream
.groupBy(new Fields("clientId"))
.aggregate(new SumCountAggregator(), new Fields("batchCount"));
Stream GroupingPeriodCounterStateStream = topology
.join(ClientState.newValuesStream(), new Fields("clientId"),
ClientBatchAggregationStream, new Fields("clientId"),
new Fields("clientId", "count", "batchCount"));
SumCountAggregator:
public class SumCountAggregator extends BaseAggregator<SumCountAggregator.CountState> {
static class CountState {
long count = 0;
}
@Override
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
@Override
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count += 1;
}
@Override
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}