Search code examples
project-reactorrsocket

Publish FluxSink of different object types


I have an rsocket endpoint that responds with a flux:

@MessageMapping("responses")
Flux<?> deal(@Payload String message) {
    return myService.generateResponses(message);
}

The responses can be any of 3 different types of objects produced asynchronously using the following code (if it worked):

public Flux<?> generateResponses(String request) {
  // Setup response sinks
  final FluxProcessor publish = EmitterProcessor.create().serialize();
  final FluxSink<Response1> sink1 = publish.sink();    
  final FluxSink<Response2> sink2 = publish.sink();
  final FluxSink<Response3> sink3 = publish.sink();

  // Get async responses: starts new thread to gather responses and update sinks
  new MyResponses(request, sink1, sink2, sink3)

  // Return the Flux
  Flux<?> output = Flux
    .from(publish
    .log());
}

The problem is that when I populate the sinks with different objects only the first sink is actually publishing back to the subscriber.

public class MyResponses extends CacheListenerAdapter {
  private FluxSink<Response1> sink1;
  private FluxSink<Response2> sink2;
  private FluxSink<Response3> sink3;

  // Constructor is omitted for brevity

  @Override
  public void afterCreate(EntryEvent event) {       
    if (event.getNewValue() instanceof Response1) {  
        Response1 r1 = (Response1)event.getNewValue();
        sink1.next(r1);
    }
    if (event.getNewValue() instanceof Response2) {  
        Response2 r2 = (Response2)event.getNewValue();
        sink2.next(r2);
    }
    if (event.getNewValue() instanceof Response3) {  
        Response3 r3 = (Response3)event.getNewValue();
        sink3.next(r3);
    }
  }
}

If I make the sinks of type <?> then there's a .next error:

The method next(capture#2-of ?) in the type FluxSink<capture#2-of ?> is not applicable for the arguments (Response1)

Is there a better approach to this requirement?


Solution

  • The reason this did not work with different object was to do with Spring Boot Data Geode serialization of underlying object types. The way to get the object Flux to work was use 1 sink of type <Object>

    public Flux<Object> generateResponses(String request) {
      // Setup the Flux
      EmitterProcessor<Object> emitter = EmitterProcessor.create();
      FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
    
      // Get async responses: starts new thread to gather responses and update sinks
      new MyResponses(request, sink)
    
      // Setup an output Flux to publish the input Flux
      Flux<Object> out = Flux
        .from(emitter
        .log(log.getName()));
    }
    

    The event handler then used the 1 sink

    public class MyResponses extends CacheListenerAdapter {
      private FluxSink<Object> sink;
    
      // Constructor is omitted for brevity
    
      @Override
      public void afterCreate(EntryEvent event) {       
        if (event.getNewValue() instanceof Response1) {  
            Response1 r1 = (Response1)event.getNewValue();
            sink.next(r1);
        }
        if (event.getNewValue() instanceof Response2) {  
            Response2 r2 = (Response2)event.getNewValue();
            sink.next(r2);
        }
        if (event.getNewValue() instanceof Response3) {  
            Response3 r3 = (Response3)event.getNewValue();
            sink.next(r3);
        }
      }
    }