Search code examples
springspring-webfluxnettyreactorreactor-netty

Linearization in Reactor Netty (Spring Boot Webflux)


How can I guarantee linearizability of requests in Reactor Netty?

Theory:

Given:
Request A wants to write x=2, y=0
Request B wants to read x, y and write x=x+2, y=y+1
Request C wants to read x and write y=x
All Requests are processed asynchronously and return to the client immediately with status ACCEPTED.

Example:
Send requests A, B, C in order.

Example Log Output: (request, thread name, x, y)
Request A, nioEventLoopGroup-2-0, x=2, y=0
Request C, nioEventLoopGroup-2-2, x=2, y=2
Request B, nioEventLoopGroup-2-1, x=4, y=3

Business logic requires all reads after A to see x=2 and y=0.
And request B to see x=2, y=0 and set y=1.
And request C to see x=4 and set y=4.

In short: The business logic makes every next write operation dependent on the previous write operation to be completed. Otherwise the operations are not reversible.

Example Code

Document:

@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {

    @Id
    private String id;

    private int data;

    public Event withNewId() {
        setId(UUID.randomUUID().toString());
        return this;
    }

}

Repo:

public interface EventRepository extends ReactiveMongoRepository<Event, String> {}

Controller:

@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
    private final EventRepository repo;

    @PostMapping
    public Mono<String> create(Event event) {
        return Mono.just(event.withNewId().getId())
                   .doOnNext(id -> 
                       // do query based on some logic depending on event data
                       Mono.just(someQuery)
                           .flatMap(query ->                        
                               repo.find(query)
                                   .map(e -> event.setData(event.getData() + e.getData())))
                           .switchIfEmpty(Mono.just(event))
                           .flatMap(e -> repo.save(e))
                           .subscribeOn(Schedulers.single())
                           .subscribe());
    }

}

It does not work, but with subscribeOn I try to guarantee linearizability. Meaning that concurrent requests A and B will always write their payload to the DB in the order in which they are received by the server. Therefore if another concurrent request C is a compound of first read than write, it will read changes from the DB that reflect those of request B, not A, and write its own changes based of B.

Is there a way in Reactor Netty to schedule executors with an unbound FIFO queue, so that I can process the requests asynchronously but in order?


Solution

  • I don't think that this is specific to Netty or Reactor in particular, but to a more broad topic - how to handle out-of-order message delivery and more-than-once message delivery. A few questions:

    1. Does the client always sends the same number of requests in the same order? There's always a chance that, due to networking issues the requests may arrive out of order, or one or more may be lost.
    2. Does the client make retries? What happens if the same request reaches the server twice?
    3. If the order matters, why doesn't the client wait for the result of the nth-1 request, before issuing nth request? In other words, why there are many concurrent requests?

    I'd try to redesign the operation in such a way that there's a single request executing the operations on the backend in the required order and using concurrency here if necessary to speed-up the process.

    If it's not possible, for example, you don't control the client, or more generally the order in which the events (requests) arrive, you have to implement ordering on application-level logic using per-message semantics to do the ordering. You can, for example store or buffer the messages, waiting for all to arrive, and when they do, only then trigger the business logic using the data from the messages in the correct order. This requires some kind of a key (identity) which can attribute messages to the same entity, and a sorting-key, that you know how to sort the messages in the correct order.

    EDIT: After getting the answers, you can definitely implement it "the Reactor way".

    
        Sinks.Many<Event> sink = Sinks.many() // you creat a 'sink' where the events will go
                .multicast()                  // broads all messages to all subscribes of the stream 
                .directBestEffort();          // additional semantics - publishing will fail if no subscribers - doesn't really matter here
    
        Flux<Event> eventFlux = sink.asFlux(); // the 'view' of the sink as a flux you can subscribe to
    
          public void run() {
            subscribeAndProcess();
            sink.tryEmitNext(new Event("A", "A", "A"));
            sink.tryEmitNext(new Event("A", "C", "C"));
            sink.tryEmitNext(new Event("A", "B", "B"));
    
            sink.tryEmitNext(new Event("B", "A", "A"));
            sink.tryEmitNext(new Event("B", "C", "C"));
            sink.tryEmitNext(new Event("B", "B", "B"));
        }
    
    
        void subscribeAndProcess() {
            eventFlux.groupBy(Event::key)
                    .flatMap(
                            groupedEvents -> groupedEvents.distinct(Event::type) // distinct to avoid duplicates
                                    .buffer(3)                                   // there are three event types, so we buffer and wait for all to arrive
                                    .flatMap(events ->                           // once all the events are there we can do the processing the way we need
                                            Mono.just(events.stream()
                                                    .sorted(Comparator.comparing(Event::type))
                                                    .map(e -> e.key + e.value)
                                                    .reduce(String::concat)
                                                    .orElse(""))
                                    )
                    )
                    .subscribe(System.out::println);
        }
        // prints values concatenated in order per key: 
        // - AAABAC
        // - BABBBC
    

    See Gist: https://gist.github.com/tarczynskitomek/d9442ea679e3eed64e5a8470217ad96a

    There are a few caveats:

    • If all of the expected events for the given key don't arrive you waste memory buffering - unless you set a timeout
    • How will you ensure that all the events for a given key go to the same application instance?
    • How will you recover from failures encountered mid-processing?

    Having all this in mind, I would go with a persistent storage - say saving the incoming events in the database, and doing the processing in background - for this you don't need to use Reactor. Most of the time a simple Servlets based Spring app will be far easier to maintain and develop, especially if you have no previous experience with Functional Reactive Programming.