Search code examples
javaquarkussmallryemutiny

Can we get parallel processing of each of Multi pipeline steps?


Lets say we have:

  • a list of URLs, that is a source for our Multi
  • as a first step we grab HTML of this page using HTTP client call
  • then we try to find some specific tag and grab its content
  • then we store things we found into database

Now we have a 3 steps here. Is there a way how these steps can be run in parallel? I mean after some time it should: grab HTML and simultaneously processing html + getting tags content and also simultaneously saving data into database from item that was processed already.(hopefully its obvious what I mean here) This way we can have parallel processing. As default, what I can see, mutiny does it in serial manner.

Here is an example:

  @Test
  public void test3() {
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
            .onItem().transform(i -> trans(i, "-step1"))
            .onItem().transform(i -> trans(i, "-step2"))
            .onItem().transform(i -> trans(i, "-step3"))
            .subscribe().with(item -> System.out.println("Subscriber received " + item));
  }


  private String trans(String s, String add) {
    int t = new Random().nextInt(4) * 1000;
    try {
      print("Sleeping for '" + s + "' miliseconds: " + t);
      Thread.sleep(t);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    return s + add;
  }

Now this reports following console output:

Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3

One can see that its not running in parallel. What did I miss here?


Solution

  • As @jponge mentioned, you can collect your items in some List<Uni<String>> and then call

    Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()
    
    List<Uni<String>> listOfUnis = new ArrayList<>();
        Multi<String> source = Multi.createFrom().items("a", "b", "c");
        source
                .onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
                .onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
                .onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
    // do not subscribe on Multis here
    

    one more note here - if you are going to make HTTP calls, better add

    .emitOn(someBlockingPoolExecutor)
    

    since you don't want to block Netty threads waiting for http calls to complete