We were testing out how to use EmitterProcessor as an eventbus. Basically we want to indirectly let the Spring Controller know that something was successfully was done, we pass a unique key which is listened for in the spring controller (So if a different key comes in for a different method, we ignore it).
public class RequestVerifier {
// constructor
public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
this.emitterProcessor = emitterProcessor;
// put this subscription here for testing purposes
emitterProcessor.subscribe(value -> {
System.out.println(value);
});
}
public void runValue(RequestData data) {
requestService.accept(data);
FluxSink<String> sink = emitterProcessor.sink();
sink.next("SOME-KEY");
}
}
@Configuration
public class AppConfiguration {
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
}
The problem is that the first message gets passed through to the emitterProcessor.subscribe
. However when the second message is sent when runValue
is called for the second time and onwards we get java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
. How do we allow the subscription to accept a constant stream of data?
Also if relevant, we are planning to use this inside an axon event handler and pass a message or key from the event handler to notify the waiting controller of the result to send back to the user.
UPDATE
Also tried the following, did not work though
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
@Bean(name = "event-autoconnector")
public Flux<String> returnFlux(EmitterProcessor<String> emitter){
return emitter.publish().autoConnect();
}
In my playground Reactive CQRS repo, I played with wrapping reactive API around Axon Framework app.
I think you may find what you are looking for in here https://github.com/stefanvozd/Reactive-CQRS, but be aware, its still work in progress and it lacks comments/documentation...
Depending on what you are trying to do with this stream here are few examples
Right now, these are all experimental examples and if you can provide more information about what you are trying to accomplish I could provide you some guidance