Search code examples
springspring-booteventsproject-reactoraxon

Project Reactor Using EmitterProcessor as an event bus


We were testing out how to use EmitterProcessor as an eventbus. Basically we want to indirectly let the Spring Controller know that something was successfully was done, we pass a unique key which is listened for in the spring controller (So if a different key comes in for a different method, we ignore it).

public class RequestVerifier {

    // constructor
    public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
        this.emitterProcessor = emitterProcessor;
        // put this subscription here for testing purposes
        emitterProcessor.subscribe(value -> {
            System.out.println(value);
        });
    }

    public void runValue(RequestData data) {
        requestService.accept(data);
        FluxSink<String> sink = emitterProcessor.sink();
        sink.next("SOME-KEY");
    }

}
@Configuration
public class AppConfiguration {

    @Bean(name = "flux-event-handler-event-bus-emitter")
    public EmitterProcessor<String> createEventBusEmitter(){
        EmitterProcessor<String> emitter = EmitterProcessor.create();
        return emitter;
    }

}

The problem is that the first message gets passed through to the emitterProcessor.subscribe. However when the second message is sent when runValue is called for the second time and onwards we get java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality). How do we allow the subscription to accept a constant stream of data?

Also if relevant, we are planning to use this inside an axon event handler and pass a message or key from the event handler to notify the waiting controller of the result to send back to the user.

UPDATE

Also tried the following, did not work though

    @Bean(name = "flux-event-handler-event-bus-emitter")
    public EmitterProcessor<String> createEventBusEmitter(){
        EmitterProcessor<String> emitter = EmitterProcessor.create();
        return emitter;
    }

    @Bean(name = "event-autoconnector")
    public Flux<String> returnFlux(EmitterProcessor<String> emitter){
        return emitter.publish().autoConnect();
    }

Solution

  • In my playground Reactive CQRS repo, I played with wrapping reactive API around Axon Framework app.

    I think you may find what you are looking for in here https://github.com/stefanvozd/Reactive-CQRS, but be aware, its still work in progress and it lacks comments/documentation...

    Depending on what you are trying to do with this stream here are few examples

    1. Example of rest controller that sends a command and waits for projection to become materialized and returns this view to the user in a non-blocking manner. For this, you should use subscription queries
    2. Example of Reactive Event Bus stream that u can inject as a bean and attach to the event stream
    3. Example of reactive event handlers, that saves projections using reactive DB driver r2dbc and you have backpressure support if you use Axon Server.

    Right now, these are all experimental examples and if you can provide more information about what you are trying to accomplish I could provide you some guidance