Search code examples
javarx-javareactivex

Chain Observables


I have a collection of objects, call them obj. They have an act() method. The act() method will eventually cause the event() observable on o to call onComplete.

What is a good way to chain these?

That is, call o.act(), wait for o.event().onComplete and then call the next o2.act(), and so on for indefinite number of o in collection.

So the signature is like so:

public class Item {
    final protected PublishSubject<Object> event = PublishSubject.create();

    public Observable<ReturnType> event() {
        return event;
    }

    public void act() {
        // do a bunch of stuff
        event.onComplete();
    }
}

And then in the consuming code:

Collection<Item> items...
foreach item in items
  item.act -> await item.event().onComplete() -> call next item.act() -> so on

Solution

  • If I understand correctly, your objects have this kind of signature:

    public class Item {
        public Observable<ReturnType> event()...
        public ReturnType act()...
    }
    

    So if they were filled out like so:

    public class Item {
    
        private final String data;
        private final Observable<ReturnType> event;
    
        public Item(String data) {
            this.data = data;
    
            event = Observable
                    .fromCallable(this::act);
        }
    
        public Observable<ReturnType> event() {
            return event;
        }
    
        public ReturnType act() {
            System.out.println("Item.act: " + data);
            return new ReturnType();
        }
    }
    

    They could then be chained like so:

    Item item1 = new Item("a");
    Item item2 = new Item("b");
    Item item3 = new Item("c");
    
    item1.event()
            .concatWith(item2.event())
            .concatWith(item3.event())
            .subscribe();
    

    Result:

    Item.act: a
    Item.act: b
    Item.act: c
    

    Then if you have an Iterable collection, you could use flatMap:

    Iterable<Item> items = Arrays.asList(item1, item2, item3);
    
    Observable.from(items)
            .flatMap(Item::event)
            .subscribe();
    

    Alternative

    An alternative that's more like your case might be:

    public class Item {
        private final PublishSubject<Void> event = PublishSubject.create();
    
        private final String data;
    
        public Item(String data) {
            this.data = data;
        }
    
        public Observable<Void> event() {
            return event;
        }
    
        public Void act() {
            System.out.println("Item.act: " + data);
            // do a bunch of stuff
            event.onCompleted();
            return null;
        }
    }
    

    usage:

    Iterable<Item> iterable = Arrays.asList(item2, item3);
    
    item1.event().
            concatWith(Observable.from(iterable)
                    .map(Item::act))
            .subscribe();
    
    item1.act();
    

    But it does not use the event() on items 2 onwards.