Search code examples
javajava-streamcyclops-react

How to instrument streams and track progress? (vanilla Java8 or cylcops-react reactive streams)


Given some code using streams to process a large number of items, what's the best way to instrument the various steps for logging and performance/profiling?

Actual example:

  ReactiveSeq.fromStream(pairs)
                .filter(this::satisfiesThreshold)
                .filter(this::satisfiesPersistConditions)
                .map((pair) -> convertToResult(pair, jobId))
                .flatMap(Option::toJavaStream)
                .grouped(CHUNK_SIZE)
                .forEach((chunk) ->
                {
                    repository.save(chunk);
                    incrementAndReport();
                });
  reportProcessingTime();

Logging progress is important so I can trigger progress events in another thread that update a user interface.

Tracking the performance characteristics of the filtering and mapping steps in this stream is desireable to see where optimizations can be made to speed it up.

I see three options:

  1. put logging/profiling code in each function
  2. use peek around each step without actually using the value
  3. some sort of annotation based or AOP solution (no idea what)

Which is the best? Any ideas on what #3 would look like? Is there another solution?


Solution

  • You have a couple of options here (if I have understood correctly) :-

    1. We can make use of the elapsed operator to track the elapsed time between element emissions e.g.

        ReactiveSeq.fromStream(Stream.of(1,2))
                   .filter(this::include)
                   .elapsed()
                   .map(this::logAndUnwrap)
      
        Long[] filterTimeTakenMillis = new Long[maxSize];
        int filterIndex = 0;
        private <T> T logAndUnwrap(Tuple2<T, Long> t) {
            //capture the elapsed time (t.v2) and then unwrap the tuple
            filterTimeTakenMillis[filterIndex++]=t.v2;
            return t.v1;
        }
      

    This will only work on cyclops-react Streams.

    1. We can make use of the AOP-like functionality in FluentFunctions

    e.g.

    ReactiveSeq.fromStream(Stream.of(1,2))
                    .filter(this::include)
                    .elapsed()
                    .map(this::logAndUnwrap)
                    .map(FluentFunctions.of(this::convertToResult)
                                       .around(a->{
    
                                        SimpleTimer timer = new SimpleTimer();
                                        String r = a.proceed();
                                        mapTimeTakenNanos[mapIndex++]=timer.getElapsedNanos();
                                        return r;
                    }));
    

    This will also work on vanilla Java 8 Streams.