I am having problems doing the following using Couchbase Java client 2.2.2 and Rx Observables 1.0.15:
What I have come up with so far looks really mean:
List<E> resultList = new ArrayList<>();
Observable
.from(originalDocumentNames)
.flatmap(key -> {
Observable firstDocument = bucket.async().get(key);
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
return Observable.merge(firstDocument, secondDocument);
})
.reduce((jsonDocument1, jsonDocument2) -> {
if (jsonDocument1 == null || jsonDocument2 == null) {
return null;
}
resultList.add(createCustomObject(jsonDocument1, jsonDocument2);
return null;
})
.filter(Objects.nonNull)
.singleOrDefault(null)
.subscribe(new Subscriber<E>() {
public void onComplete() {
//use resultList in a callback function
}
});
This does not work. I do not know where, but I think I am using Observable.merge
the wrong way.
Also I think I am approaching the whole problem the wrong way.
So the main questions it seems are:
You could use zip
inside the flatmap. Zip will emit as many items as the Observable
with the fewest items. So if one of the documents is missing, its sequence will be empty and zip will skip it.
Observable
.from(originalDocumentNames)
.flatmap(key -> {
//the stream of 0-1 original document
Observable firstDocument = bucket.async().get(key);
//the stream of 0-1 associated document
Observable secondDocument = bucket.async().get(getSecondKeyNameFrom(key));
//using zip and the createCustomObject method reference as a zip function to combine pairs of documents
return Observable.zip(firstDocument, secondDocument, this::createCustomObject);
})
.toList() //let RxJava aggregate into a List
.subscribe(
//the "callback" function, onNext will be called only once with toList
list -> doSomething(list),
//always try to define onError (best practice)
e -> processErrors(e)
);