Search code examples
javacouchbaseproject-reactor

Blocking upsert in couchbase


I am writing a test where I need to upsert some documents(let's say 10) in my couchbase bucket before actually running any tests. So I have a method annotated with @BeforeAll that tries to upset these documents. Now when I try to run the test, the test fails because documents were not persisted by then. In order to wait for these documents to be inserted I am doing something like this -


    Flux.fromIterable(couchDocs)
        .map(couchDoc -> bucket.upsert(couchDoc, persistTo)
        .collectList()
        .block();

But still when I run the test I can see that the documents were not persisted by then and the assertions fail. Am I missing something here?


Solution

  • Use .flatMap instead of .map. You inner stream is not getting subscribed to.