I am constructing a color image from 3 grayscale images. I make 3 separate (parallel) requests for each color band using rsocket, which returns a Mono<byte[]>
. I need to collect all 3 byte arrays as a list so that I can build the the image properly. However, I'm confused as to how I am able to identify which band the byte array belonged to in the next reactive operation.
Here is some sample code:
return Flux.just(redRequest, blueRequest, greenRequest)
.parallel()
.flatMap(client::getBytes) // calls the remote service to get the bytes for the given color band
.sequential()
.collectList()
.map(response -> {
byte[] redBytes = response.get(0); //???
}
I created a wrapper class to hold the original request, byte array, and band identifier so I could pass all the objects around in each operation, but because the response from my rsocket client is a Mono, I can only materialize (probably wrong terminology) the byte array by calling it in a map or flatMap, at which point I don't have access to my wrapper class and I'm not sure which band the byte array belongs to.
Can I fix this by simply not calling the client in a parallel request? Would I ever be guaranteed that items will propagate through the chain in the order I defined them in the Flux.just()
.
Basically in the last map, I just really want to be able to know which byte array belongs to which color band.
Wrap each request in the object with a color field and change the flatMap
piece of your pipeline that sends these requests:
Flux.just(
new Request(redRequest, RED),
new Request(blueRequest, BLUE),
new Request(greenRequest, GREEN)
)
.parallel()
.flatMap(request ->
client.getBytes(request)
.map(response -> new Response(response.get(0), request.color))
)
.sequential()
.collectList()
.map(response -> {
byte[] redBytes = response.bytes;
Color color = response.color;
//
})