Search code examples
javaspringreactorproject-reactor

How to properly use a Reactor Publisher


I can´t figure out how to properly implement a Publisher/Subscriber scenario with Reactor. I have a working solution, but the implementation does not seem correct to me:

My problem is that I need to manually implement the publisher to register the subscriber and pass the events:

public void publishQuotes(String ticker) throws InterruptedException {

// [..] Here I generate some "lines" to be publisher

for (Subscriber<? super String> subscriber : subscribers) {
    lineList.forEach(line -> subscriber.onNext(line));
}

}

@Override
public void subscribe(Subscriber<? super String> subscriber) {
    subscribers.add(subscriber);

}

Then, I have a WorkQueue Processor (that should be the consumer):

WorkQueueProcessor<String> sink = WorkQueueProcessor.create();

// Here I subscribe to my publiser
publisher.subscribe(sink);

// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);

// Here I perform a number of stream transformations 

// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

It works fine but is ugly as hell. In this example taken from the Spring Guides, they use an EventBus to route the events from the publisher to the consumer but, when I try to link it with my processor I get this compiler error:

eventBus.on($("quotes"),sink);

The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)

I´m lost here, what would be the best way to link a publisher with a Processor? Would you recommend using an EventBus? If so, what would be the proper invokation?

Thanks!


Solution

  • If you use the EventBus, you will publish your lines via

     eventBus.notify("quotes", Event.wrap(line);
    

    And subscribe via

    eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);
    

    where "e" is of Type Event<String>.