Search code examples
javaproject-reactorflatmap

How to unroll multiple Flux and keep the originals?


I want to generate an interleaved stream of two objects types.

I have the two helper methods

  • Aaa genAaa() (or Mono<Aaa>?) to create one Aaa and
  • Flux<Bbb> genBbbs(Aaa a) to create a set of Bbbs from to a.

The overall result should be a Flux<JsonNode> to accomodate the mixing of the two objects types.

So, the result would be something like

[ {'name':'Aaa-X'}, 
    {'name':'Bbb-x1'},
    {'name':'Bbb-x2'},
  {'name':'Aaa-Y'},
    {'name':'Bbb-y1'},
    {'name':'Bbb-y2'},
    {'name':'Bbb-y3'}
]

As a rough sketch I tried this:

final ObjectMapper om = new ObjectMapper();

public Flux<JsonNode> create() {
  return Flux.range(0, 2)       // create 2
    .map( idx -> genAaa() )     // bare Aaa's
    .flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
    .map( om::valueToTree );    // anything to JsonNode
}

But I have several one big issue here:

Because I transform the Aaa objects (and thus consume them) they are not in the result anymore. I have no idea how I can "use" and keep them in this scenario.

I was thinking if I could pass the "flux in progress" as a parameter to the generate functions, so they each add JsonNodes as they are created, but that feels wrong (totally not-async) and I wouldn't now how anyway. I suppose there is a concept in Fluxes that just eludes me, still.


Solution

  • You can use Flux#concat together with genBbbs method inside the function passed to the flatMap:

    private static Flux<JsonNode> combine() {
        ObjectMapper objectMapper = new ObjectMapper();
        return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
                .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
                .map(objectMapper::valueToTree); // Flux<JsonNode>
    }
    

    the concat method simply concatenates two sources:

    1. Artificially created using Mono.just
    2. The Flux<B> from the getBbbs(aaa) invocation

    Example output:

    {"name":"a0"}
    {"name":"B1-a0"}
    {"name":"B2-a0"}
    {"name":"a1"}
    {"name":"B1-a1"}
    {"name":"B2-a1"}
    

    Full listing:

    public class Main {
        @AllArgsConstructor
        @Data
        private static class Aaa {
            private String name;
        }
    
        @AllArgsConstructor
        @Data
        private static class Bbb {
            private String name;
        }
    
        private static Mono<Aaa> getAaa(String name) {
            return Mono.just(new Aaa(name));
        }
    
        private static Flux<Bbb> getBbbs(Aaa aaa) {
            return Flux.just(new Bbb("B1-" + aaa.getName()), new Bbb("B2-" + aaa.getName()));
        }
    
    
        public static void main(String[] args) {
            combine().subscribe(System.out::println);
        }
    
        private static Flux<JsonNode> combine() {
            ObjectMapper objectMapper = new ObjectMapper();
            return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
                    .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
                    .map(objectMapper::valueToTree); // Flux<JsonNode>
        }
    }