Search code examples
javaspringreactive-programmingproject-reactorreactor

Identifying the origin of byte arrays in a reactive chain in Project Reactor


I am constructing a color image from 3 grayscale images. I make 3 separate (parallel) requests for each color band using rsocket, which returns a Mono<byte[]>. I need to collect all 3 byte arrays as a list so that I can build the the image properly. However, I'm confused as to how I am able to identify which band the byte array belonged to in the next reactive operation.

Here is some sample code:

 return Flux.just(redRequest, blueRequest, greenRequest)
                .parallel()
                .flatMap(client::getBytes) // calls the remote service to get the bytes for the given color band
                .sequential()
                .collectList()
                .map(response -> {
                   byte[] redBytes = response.get(0); //???
                }

I created a wrapper class to hold the original request, byte array, and band identifier so I could pass all the objects around in each operation, but because the response from my rsocket client is a Mono, I can only materialize (probably wrong terminology) the byte array by calling it in a map or flatMap, at which point I don't have access to my wrapper class and I'm not sure which band the byte array belongs to.

Can I fix this by simply not calling the client in a parallel request? Would I ever be guaranteed that items will propagate through the chain in the order I defined them in the Flux.just().

Basically in the last map, I just really want to be able to know which byte array belongs to which color band.


Solution

  • Wrap each request in the object with a color field and change the flatMap piece of your pipeline that sends these requests:

    Flux.just(
      new Request(redRequest, RED),
      new Request(blueRequest, BLUE),
      new Request(greenRequest, GREEN)
    )
    .parallel()
    .flatMap(request -> 
      client.getBytes(request)
        .map(response -> new Response(response.get(0), request.color))
    )
    .sequential()
    .collectList()
    .map(response -> {
      byte[] redBytes = response.bytes;
      Color color = response.color;
      // 
    })