Search code examples
javacouchbaserx-javaobservablecouchbase-java-api

Rx Observables: emit additional item for each original item, reduce them to another type, consume


I am having problems doing the following using Couchbase Java client 2.2.2 and Rx Observables 1.0.15:

  • I have a list of strings which are document names
  • Along with each original document for a document name I would like to load another document (deduced from the original document name) so I would get a pair of documents. If any of those two documents do not exist, do not use this pair any more.
  • If the pair is valid (i.e. both documents exist) then use both documents to create a custom object from them
  • combine those transformed items into a list

What I have come up with so far looks really mean:

List<E> resultList = new ArrayList<>();

Observable
    .from(originalDocumentNames)
    .flatmap(key -> {
        Observable firstDocument = bucket.async().get(key);
        Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
        return Observable.merge(firstDocument, secondDocument);
    })
    .reduce((jsonDocument1, jsonDocument2) -> {
        if (jsonDocument1 == null || jsonDocument2 == null) {
            return null;
        }
        resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
        return null;
    })
    .filter(Objects.nonNull)
    .singleOrDefault(null)
    .subscribe(new Subscriber<E>() {
        public void onComplete() {
            //use resultList in a callback function
        }
    });

This does not work. I do not know where, but I think I am using Observable.merge the wrong way. Also I think I am approaching the whole problem the wrong way.

So the main questions it seems are:

  • how do I emit an additional item to an Observable stream?
  • how can I reduce two items into an item of another type? (reduce(T, T, T) does not allow that)
  • am I taking it on wrong?

Solution

  • You could use zip inside the flatmap. Zip will emit as many items as the Observable with the fewest items. So if one of the documents is missing, its sequence will be empty and zip will skip it.

    Observable
    .from(originalDocumentNames)
    .flatmap(key -> {
        //the stream of 0-1 original document
        Observable firstDocument = bucket.async().get(key);
        //the stream of 0-1 associated document
        Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
    
        //using zip and the createCustomObject method reference as a zip function to combine pairs of documents
        return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
    })
    .toList() //let RxJava aggregate into a List
    .subscribe(
        //the "callback" function, onNext will be called only once with toList
        list -> doSomething(list), 
        //always try to define onError (best practice)
        e -> processErrors(e)
    );