Search code examples
javajava-8reactive-streamscyclops-react

cyclops-react: No batching functions on ReactiveSeq?


Using cyclops-react 1.0.0-RC3, I tried to recreate the examples on the cyclops-react streams user guide with batching. I found that some methods were missing from ReactiveSeq, including batchBySize and windowByTime.

I did find these methods on StreamUtils and they worked as expected, but didn't look quite as slick as the examples in the user guide...

from user guide...

// Example 19. Batch by size example
ReactiveSeq.of(1,2,3,4,5, 6)
  .map(n-> n==6? sleep(1) : n)
  .batchBySize(4) // this function seems to be missing...
  .toList()

what I could get working...

import com.aol.cyclops.control.ReactiveSeq;
// ...
StreamUtils.batchBySize(
    ReactiveSeq.of(1, 2, 3, 4, 5, 6)
        .map(n -> TestUtils.mayBeSlow(n)),
    4)
    .collect(Collectors.toList());

You can see my code in a working JUnit in the testBatchingSlidingWindowing method test class StreamsTest.java

Should I expect to find the batchBySize and windowByTime on ReactiveSeq or is using StreamUtils the appropriate way?


Solution

  • Use grouped instead. It is available on all cyclops-react Traversable types (such as ListX, SetX, QueueX, DequeX, ReactiveSeq and others). So your example would become

    ReactiveSeq.of(1,2,3,4,5, 6)
               .map(n-> n==6? sleep(1) : n)
               .grouped(4) 
               .toList()
    

    The groupedXXX operators act like both batchByXXX and windowByXXX providing access to the grouped data via an extended Collection type, which itself has all of the traversable & foldable operators.

    E.g. to double e.g. member of the group / batched list

     ReactiveSeq.of(1,2,3,4,5, 6)
               .map(n-> n==6? sleep(1) : n)
               .grouped(4) 
               .map(list-> list.map(i->i*2))
               .toList() 
    

    You can also use groupedT which returns a ListTransformer. ListTransformers allow you to manipulate nested structures as if they were unnested.

    E.g. to double e.g. member of the group / batched list using groupedT

    ReactiveSeq.of(1,2,3,4,5, 6)
               .map(n-> n==6? sleep(1) : n)
               .groupedT(4) 
               .map(i->i*2);
    

    And to convert a ListTransformer back to a Stream of Lists

    ListTSeq<Integer> listT = ReactiveSeq.of(1,2,3,4,5, 6)
                                         .map(n-> n==6? sleep(1) : n)
                                         .groupedT(4);
    
    ReactiveSeq<ListX<Integer>> nested = listT.toNestedListX()
                                              .stream();