Trying to wrap my head around the reactor model and pipeline, I want to insert to mongo a couple of Users
, then for each user I would like to insert several (10) Offers
My current implementation include inserting 3 users to the database, block and insert the offers (only for 1 user) in a somewhat backward way, like so
Flux.just(u1, u2, u3).flatMap(u -> reactiveMongoTemplate.insert(u)).blockLast();
Arrays.asList(u1, u2, u3).forEach(user -> {
IntStream.range(0,10).forEach(i -> reactiveMongoTemplate.insert(new Offer(user)).subscribe());
The first line run fine, but I get the following exception
java.lang.IllegalStateException: state should be: open
Of course I can bypass this by inserting for each user separately, I don't know why this exception was raised and appreciate an answer about this issue as well
My main question is how to write it in the most reactive way, should I need to block in order to populate the entity Id
after insert or there is a better way?
The exact implementation of User
and Offer
doesn't really matter, it can be a any simple records, but here they are
@Document(collection = "users")
public class User extends BaseEntity {
private String name;
@Document(collection = "offers")
public class Offer extends BaseEntity {
private String title;
@JsonSerialize(using = ToStringSerializer.class)
private ObjectId user;
public Offer(){
this.title = "some title " + new Random().nextInt(10);
public Offer(User user){
this.user = new ObjectId(user.getId());
public void setUser(String userId) {
this.user = new ObjectId(userId);
is from spring-boot-starter-data-mongodb-reactive @EnableReactiveMongoRepositories
Turn out I was pretty close to the correct solution
Flux.just(u1, u2, u3).flatMap(u -> reactiveMongoTemplate.insert(u)).subscribe(u -> {
Flux.range(0,10).flatMap(i -> reactiveMongoTemplate.insert(new Offer(u))).subscribe();
now the code is truly reactive and it can be seen on the database as well (records are inserted with random order)