I'm using Reactor 2.0.0.M1, and I'm trying to filter a Stream
. Depending on the results of my boolean operation, I want to either continue with one or another stream. This seems to be possible with the otherwise()
function, but its not exactly clear how to use it.
My Stream looks something like this:
stream.filter(o -> o.isValid());
To handle the case where o.isValid()
is true, my understanding is I can just call .map()
to continue down the stream.
To handle the case where o.isValid()
is false, I can access the alternate .otherwise()
stream.
But there doesn't seem to be an or()
or similar method, so it doesn't seem possible to configure both streams in a completely fluid way.
The best I can come up with is something like this:
FilterAction<Object> filterAction = stream.filter(o -> o.isValid());
// Returns a 'true' Stream, which might additional operations
filterAction
.map(o -> trueOperation1(o))
.map(o -> trueOperation2(o));
// Returns a 'false' Stream, which might different additional operations
filterAction.otherwise()
.map(o -> falseOperation1(o))
.map(o -> falseOperation2(o));
Is that really the best way to proceed?
I solved this by using groupBy() and flatMap().
Here is an example:
// your initial stream
Broadcaster<Object> stream = Streams.<Object>broadcast(environment);
stream
.groupBy(o -> o.isValid())
.flatMap(groupedStream -> {
if (groupedStream.key()) {
return groupedStream.map(o -> trueOperation(o));
} else {
return groupedStream.map(o -> falseOperation(o));
}
.map(o -> additionalOperations();
What happens here is that groupBy() converts your Stream into a Stream<GroupedStream<O>>
. In other words, a stream of streams of objects. Each inner stream contains a group of objects that were bucketed by the operation in the groupBy() call. In my case, I've filtered the Objects into true
and false
buckets.
Next, flatMap() takes the multiple streams, processes them, and then flattens the output back into a single Stream<Object>
. Within flatMap(), you can check the Stream's key(), and perform additional operations on the stream based on the key().
Then after the flatMap() finishes, you have a Stream again, and can do any post processing you want.