Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert))
.retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
Req :
I am new to RxJava. I wrote the above react code however it seems that I have not clearly understood some concepts. My idea was that the forEach at the end will always get the emitted item and if an exception occurs I will catch it and then later use the insertedIds list to create the 2nd documents. However the list always has all the ids which doesnt fulfill my reqs.
Can anyone please explain whats wrong with the code and how I could achieve the above reqs?
the retry
methods will resubscribe to the upstream Observable
.
In your case that means subscribing to couchbaseDocuments
and potentially attempting to re-insert docs that have already successfully been inserted.
Instead of retrying the entire stream again, you might prefer to just retry the insert that failed:
Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
Essentially: you had to move a single parenthesis.