Search code examples
springrx-javareactorspring-data-couchbasespring-webflux

ClassCastException was threw when call deleteAll of Reative Spring Data Couchbase Repository


I have just tried Reactive feature provided in Spring Data Couchbase.

There is a DataInitializer class to initialize the data for the application.

@Component
@Slf4j
class DataInitializer {

    private final PostRepository posts;

    public DataInitializer(PostRepository posts) {
        this.posts = posts;
    }

    @EventListener(value = ContextRefreshedEvent.class)
    public void init() {
        log.info("start data initialization  ...");
        this.posts
            .deleteAll()
            .thenMany(
                Flux
                    .just("Post one", "Post two")
                    .flatMap(
                        title -> this.posts.save(Post.builder().id(UUID.randomUUID().toString()).title(title).content("content of " + title).build())
                    )
            )
            .log()
            .subscribe(
                null,
                null,
                () -> log.info("done initialization...")
            );

    }

}

But when I run the application, an class cast exception occurred.

22:56:09.212 [cb-computations-4] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.ClassCastException: rx.Observable cannot be cast to com.couchbase.client.java.view.AsyncViewRow)
22:56:09.214 [cb-computations-4] ERROR reactor.Flux.ConcatArray.1 - 
java.lang.ClassCastException: rx.Observable cannot be cast to com.couchbase.client.java.view.AsyncViewRow
    at org.springframework.data.couchbase.repository.support.SimpleReactiveCouchbaseRepository.lambda$deleteAll$8(SimpleReactiveCouchbaseRepository.java:254)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.producers.SingleProducer.request(SingleProducer.java:65)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
    at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:281)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: rx.Observable.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
    ... 22 common frames omitted

The complete codes can be accessed from here. This repository also includes samples of other reactive Spring data variants(Spring Data Mongo, Spring Data Cassandra etc), I used the same codes(DataInitializer) to initialize data, but others work well.


Solution

  • Reported as a bug by Spring developer, see DATACOUCH-356