Search code examples
javareactive-programmingrx-javadisruptor-pattern

Creating Observable that batches like ring buffer (advice needed)


After finding no proper solution for my problem described here, I've decided to implement this.

However I lack experience of working with monads and things like lift(..) still look a bit magical to me...

The purpose of me opening this is so that those who had implemented some custom stuff on top of rxjava, could give me advice on how to implement this.

Now what is this all about, here is the interface.

enter image description here

I suppose this will be self explanatory for most of you, but to be sure i'll bring an example.

Imagine we have a subscriber (consumer) which actually does persistance to database, well obviously if you give it 1 or 1000 objects to persist the difference wont be factor of 1000 it will be factor of 10 or much less, which means it is a consumer that can adopt to load... So it is stupid to push one item at a time while you can persist many at a time, on the other side its stupid to wait to some batch of N elements to fill up till you persist (one second you might get 1000 elements on other you might get none, so suppose we dont know how frequent incoming data is)...

So what we have now is Observable.batch() which will ask for some N size of the batch and we will often wait without working... On the other side we have Disruptor which does exactly what we want but does not provide beautiful interface of Observable... Disruptor will process a single element and while you process it it will collect all the incoming elements and next time you'll get batch of everything that was collected since your consumer was busy with the last value...

Currently I suppose that I shall use Observable.from() to implement this or lift()...

Please share your thoughts on this, maybe there are already solutions available for this that I'm unaware of, or I'm about to implement this in a wrong way...


Solution

  • Here is an operator that will batch values that piled up behind an asynchronous boundary:

    public final class OperatorRequestBatcher<T> 
    implements Operator<List<T>, T> {
        final Scheduler scheduler;
        public OperatorRequestBatcher(Scheduler scheduler) {
            this.scheduler = scheduler;
        }
        @Override
        public Subscriber<? super T> call(Subscriber<? super List<T>> t) {
            Scheduler.Worker w = scheduler.createWorker();
            RequestBatcherSubscriber<T> parent = 
                    new RequestBatcherSubscriber<>(t, w);
    
            t.add(w);
            t.add(parent);
    
            return parent;
        }
    
        static final class RequestBatcherSubscriber<T> 
        extends Subscriber<T> implements Action0 {
            final Subscriber<? super List<T>> actual;
            final Scheduler.Worker w;
            final Queue<T> queue;
            final AtomicInteger wip;
    
            volatile boolean done;
            Throwable error;
    
            public RequestBatcherSubscriber(
                    Subscriber<? super List<T>> actual, 
                    Scheduler.Worker w) {
                this.actual = actual;
                this.w = w;
                this.wip = new AtomicInteger();
                this.queue = new SpscLinkedArrayQueue<>(256);
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
                queue.offer(t);
                schedule();
            }
    
            @Override
            public void onError(Throwable e) {
                if (done) {
                    return;
                }
                error = e;
                done = true;
                schedule();
            }
    
            @Override
            public void onCompleted() {
                done = true;
                schedule();
            }
    
            void schedule() {
                if (wip.getAndIncrement() == 0) {
                    w.schedule(this);
                }
            }
    
            @Override
            public void call() {
                int missed = 1;
    
                final Queue<T> q = queue;
                final Subscriber<? super List<T>> a = actual;
                final AtomicInteger wip = this.wip;
    
                for (;;) {
    
                    List<T> list = new ArrayList<>();
    
                    for (;;) {
                        boolean d = done;
                        T v = q.poll();
                        boolean e = v == null;
    
                        if (isUnsubscribed()) {
                            q.clear();
                            return;
                        }
    
                        if (d) {
                            Throwable err = error;
                            if (err != null) {
                                a.onError(err);
                                return;
                            } else
                            if (e) {
                                if (!list.isEmpty()) {
                                    a.onNext(list);
                                }
                                a.onCompleted();
                                return;
                            }
                        }
    
                        if (e) {
                            break;
                        }
    
                        list.add(v);
                    }
    
                    if (!list.isEmpty()) {
                        a.onNext(list);
                    }
    
                    missed = wip.addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
        }
    
        public static void main(String[] args) {
            PublishSubject<Integer> ps = PublishSubject.create();
            TestScheduler sch = Schedulers.test();
    
            ps.lift(new OperatorRequestBatcher<>(sch))
            .subscribe(System.out::println, Throwable::printStackTrace, 
                    () -> System.out.println("Done"));
    
            ps.onNext(1);
            ps.onNext(2);
    
            sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    
            ps.onNext(3);
    
            sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    
            ps.onNext(4);
            ps.onNext(5);
            ps.onNext(6);
            ps.onCompleted();
    
            sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        }
    }
    

    Note, however, what you described in the API is a form of a hot Observable: cold sources don't coordinate across multiple Subscribers. For that you need to create a custom ConnectableObservable.

    publish() may work for disruptForEachSubscriber but publish().observeOn() for disruptForAllSubscriber is not likely because observeOn will request a bunch of values and publish would interpret it as a successfull processing of N batches.