Search code examples
java-8java-streamstreamex

Should StreamEx parallelism work when using takeWhile?


I have a stream that I create like this:

StreamEx.generate(new MySupplier<List<Entity>>())
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .. // more stuff

I can change this to work in parallel by just adding parallel:

StreamEx.generate(new MySupplier<List<Entity>>())
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .parallel()
        .. // more stuff

But I also want to add a takeWhile condition to make the stream stop:

StreamEx.generate(new MySupplier<List<Entity>>())
        .takeWhile(not(List::isEmpty))
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .parallel()
        .. // more stuff

But as soon as I add the takeWhile the stream seems to become sequential (at least it's only processed by one thread). According to the javadoc of takeWhile, if I understand it correctly, should work with parallel streams. Am I doing something wrong or is this according to design?


Solution

  • As in normal Stream API if something works in parallel, it does not mean that it works efficiently. The javadoc states that:

    While this operation is quite cheap for sequential stream, it can be quite expensive on parallel pipelines.

    Actually you want to use takeWhile with unordered stream which could be optimized specially, but does not optimized currently, so this could be considered as a defect. I will try to fix this (I'm the StreamEx author).

    Update: fixed in version 0.6.5