Given some code using streams to process a large number of items, what's the best way to instrument the various steps for logging and performance/profiling?
Actual example:
ReactiveSeq.fromStream(pairs)
.filter(this::satisfiesThreshold)
.filter(this::satisfiesPersistConditions)
.map((pair) -> convertToResult(pair, jobId))
.flatMap(Option::toJavaStream)
.grouped(CHUNK_SIZE)
.forEach((chunk) ->
{
repository.save(chunk);
incrementAndReport();
});
reportProcessingTime();
Logging progress is important so I can trigger progress events in another thread that update a user interface.
Tracking the performance characteristics of the filtering and mapping steps in this stream is desireable to see where optimizations can be made to speed it up.
I see three options:
peek
around each step without actually using the valueWhich is the best? Any ideas on what #3 would look like? Is there another solution?
You have a couple of options here (if I have understood correctly) :-
We can make use of the elapsed operator to track the elapsed time between element emissions e.g.
ReactiveSeq.fromStream(Stream.of(1,2))
.filter(this::include)
.elapsed()
.map(this::logAndUnwrap)
Long[] filterTimeTakenMillis = new Long[maxSize];
int filterIndex = 0;
private <T> T logAndUnwrap(Tuple2<T, Long> t) {
//capture the elapsed time (t.v2) and then unwrap the tuple
filterTimeTakenMillis[filterIndex++]=t.v2;
return t.v1;
}
This will only work on cyclops-react Streams.
e.g.
ReactiveSeq.fromStream(Stream.of(1,2))
.filter(this::include)
.elapsed()
.map(this::logAndUnwrap)
.map(FluentFunctions.of(this::convertToResult)
.around(a->{
SimpleTimer timer = new SimpleTimer();
String r = a.proceed();
mapTimeTakenNanos[mapIndex++]=timer.getElapsedNanos();
return r;
}));
This will also work on vanilla Java 8 Streams.