Search code examples
rx-javacouchbaserx-java2couchbase-java-api

RxJava : Retain partial list of objects which were inserted in the couchbase db before exception was thrown


 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert))
        .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

Req :

  1. Bulk insert documents in couchbase of a certain type.
  2. Create another document which will have just the ids of the docs we inserted in the first step
  3. If first steps fails at some id we stop inserting, the 2nd doc then should have only those ids which were inserted till the exception occured
  4. The call is synchronous

I am new to RxJava. I wrote the above react code however it seems that I have not clearly understood some concepts. My idea was that the forEach at the end will always get the emitted item and if an exception occurs I will catch it and then later use the insertedIds list to create the 2nd documents. However the list always has all the ids which doesnt fulfill my reqs.

Can anyone please explain whats wrong with the code and how I could achieve the above reqs?


Solution

  • the retry methods will resubscribe to the upstream Observable.

    In your case that means subscribing to couchbaseDocuments and potentially attempting to re-insert docs that have already successfully been inserted.

    Instead of retrying the entire stream again, you might prefer to just retry the insert that failed:

     Observable
            .from(couchbaseDocuments)
            .subscribeOn(Schedulers.io())
            .flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
            .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
            .toBlocking()
            .forEach(id->insertedIds.add(id));
    

    Essentially: you had to move a single parenthesis.