Using cyclops-react 1.0.0-RC3, I tried to recreate the examples on the cyclops-react streams user guide with batching. I found that some methods were missing from ReactiveSeq
, including batchBySize
and windowByTime
.
I did find these methods on StreamUtils
and they worked as expected, but didn't look quite as slick as the examples in the user guide...
// Example 19. Batch by size example
ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.batchBySize(4) // this function seems to be missing...
.toList()
import com.aol.cyclops.control.ReactiveSeq;
// ...
StreamUtils.batchBySize(
ReactiveSeq.of(1, 2, 3, 4, 5, 6)
.map(n -> TestUtils.mayBeSlow(n)),
4)
.collect(Collectors.toList());
You can see my code in a working JUnit in the testBatchingSlidingWindowing
method test class StreamsTest.java
Should I expect to find the batchBySize
and windowByTime
on ReactiveSeq
or is using StreamUtils
the appropriate way?
Use grouped instead. It is available on all cyclops-react Traversable types (such as ListX, SetX, QueueX, DequeX, ReactiveSeq and others). So your example would become
ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.grouped(4)
.toList()
The groupedXXX operators act like both batchByXXX and windowByXXX providing access to the grouped data via an extended Collection type, which itself has all of the traversable & foldable operators.
E.g. to double e.g. member of the group / batched list
ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.grouped(4)
.map(list-> list.map(i->i*2))
.toList()
You can also use groupedT which returns a ListTransformer. ListTransformers allow you to manipulate nested structures as if they were unnested.
E.g. to double e.g. member of the group / batched list using groupedT
ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.groupedT(4)
.map(i->i*2);
And to convert a ListTransformer back to a Stream of Lists
ListTSeq<Integer> listT = ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.groupedT(4);
ReactiveSeq<ListX<Integer>> nested = listT.toNestedListX()
.stream();