Lets say we have:
Now we have a 3 steps here. Is there a way how these steps can be run in parallel? I mean after some time it should: grab HTML and simultaneously processing html + getting tags content and also simultaneously saving data into database from item that was processed already.(hopefully its obvious what I mean here) This way we can have parallel processing. As default, what I can see, mutiny does it in serial manner.
Here is an example:
@Test
public void test3() {
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem().transform(i -> trans(i, "-step1"))
.onItem().transform(i -> trans(i, "-step2"))
.onItem().transform(i -> trans(i, "-step3"))
.subscribe().with(item -> System.out.println("Subscriber received " + item));
}
private String trans(String s, String add) {
int t = new Random().nextInt(4) * 1000;
try {
print("Sleeping for '" + s + "' miliseconds: " + t);
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + add;
}
Now this reports following console output:
Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3
One can see that its not running in parallel. What did I miss here?
As @jponge mentioned, you can collect your items in some List<Uni<String>>
and then call
Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()
List<Uni<String>> listOfUnis = new ArrayList<>();
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
// do not subscribe on Multis here
one more note here - if you are going to make HTTP calls, better add
.emitOn(someBlockingPoolExecutor)
since you don't want to block Netty threads waiting for http calls to complete