Search code examples
observablerx-javacouchbasecouchbase-java-api

How to proceed to next document when DocumentAlreadyExistsException exception is thrown while inserting documents into couchbase using JavaRX?


I'm working on batch job which reads, transforms and writes documents to coucbase. I'm using Java to insert documents in bulk. I don't want to use upsert since I want to log if the document already exist in couchbase and insert is called. Please find my code below. When the document already exist, DocumentAlreadyExistsException is getting thrown and the job is getting stopped instead of proceeding with the next document. How to handle this issue?

Code:

    public void insertAll(Collection<JsonDocument> documents) {

    Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(final JsonDocument docToInsert) {
            return couchbaseConfig.catalogBucket().async().insert(docToInsert)
                    .doOnError((Throwable throwable) -> log.error(
                            "Exception {} occured while inerting document {} to cb", throwable.getMessage(),
                            docToInsert));
        }
    }).last().toBlocking().single();

}

Exception:

com.couchbase.client.java.error.DocumentAlreadyExistsException: null
at com.couchbase.client.java.bucket.api.Mutate$1$1.call(Mutate.java:154) ~[java-client-2.7.4.jar:na]
at com.couchbase.client.java.bucket.api.Mutate$1$1.call(Mutate.java:132) ~[java-client-2.7.4.jar:na]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers$5.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:211) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.4.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:86) ~[core-io-1.7.4.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:526) ~[core-io-1.7.4.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.InsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ~[rxjava-1.3.8.jar:1.3.8]
... 21 common frames omitted

Solution

  • You can use onErrorResumeNext to suppress the error, like this:

    Observable.from(docs)
      .flatMap(docToInsert ->
        bucket.insert(docToInsert)
          .doOnError(t -> System.out.println("oops, error inserting " + docToInsert.id() + " : " + t))
          .onErrorResumeNext(t ->
            t instanceof DocumentAlreadyExistsException ? Observable.empty() : Observable.error(t))
      ).toCompletable().await();
    

    Note that toCompleteable().await() works even if no items are emitted; single() would throw if all of the documents already exist.