I have an rsocket endpoint that responds with a flux:
@MessageMapping("responses")
Flux<?> deal(@Payload String message) {
return myService.generateResponses(message);
}
The responses can be any of 3 different types of objects produced asynchronously using the following code (if it worked):
public Flux<?> generateResponses(String request) {
// Setup response sinks
final FluxProcessor publish = EmitterProcessor.create().serialize();
final FluxSink<Response1> sink1 = publish.sink();
final FluxSink<Response2> sink2 = publish.sink();
final FluxSink<Response3> sink3 = publish.sink();
// Get async responses: starts new thread to gather responses and update sinks
new MyResponses(request, sink1, sink2, sink3)
// Return the Flux
Flux<?> output = Flux
.from(publish
.log());
}
The problem is that when I populate the sinks with different objects only the first sink is actually publishing back to the subscriber.
public class MyResponses extends CacheListenerAdapter {
private FluxSink<Response1> sink1;
private FluxSink<Response2> sink2;
private FluxSink<Response3> sink3;
// Constructor is omitted for brevity
@Override
public void afterCreate(EntryEvent event) {
if (event.getNewValue() instanceof Response1) {
Response1 r1 = (Response1)event.getNewValue();
sink1.next(r1);
}
if (event.getNewValue() instanceof Response2) {
Response2 r2 = (Response2)event.getNewValue();
sink2.next(r2);
}
if (event.getNewValue() instanceof Response3) {
Response3 r3 = (Response3)event.getNewValue();
sink3.next(r3);
}
}
}
If I make the sinks of type <?>
then there's a .next
error:
The method next(capture#2-of ?) in the type FluxSink<capture#2-of ?> is not applicable for the arguments (Response1)
Is there a better approach to this requirement?
The reason this did not work with different object was to do with Spring Boot Data Geode serialization of underlying object types. The way to get the object Flux to work was use 1 sink of type <Object>
public Flux<Object> generateResponses(String request) {
// Setup the Flux
EmitterProcessor<Object> emitter = EmitterProcessor.create();
FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
// Get async responses: starts new thread to gather responses and update sinks
new MyResponses(request, sink)
// Setup an output Flux to publish the input Flux
Flux<Object> out = Flux
.from(emitter
.log(log.getName()));
}
The event handler then used the 1 sink
public class MyResponses extends CacheListenerAdapter {
private FluxSink<Object> sink;
// Constructor is omitted for brevity
@Override
public void afterCreate(EntryEvent event) {
if (event.getNewValue() instanceof Response1) {
Response1 r1 = (Response1)event.getNewValue();
sink.next(r1);
}
if (event.getNewValue() instanceof Response2) {
Response2 r2 = (Response2)event.getNewValue();
sink.next(r2);
}
if (event.getNewValue() instanceof Response3) {
Response3 r3 = (Response3)event.getNewValue();
sink.next(r3);
}
}
}