Search code examples
spring-bootspring-data-mongodbfluxproject-reactor

Insert nested records to mongo in reactive fashion


Trying to wrap my head around the reactor model and pipeline, I want to insert to mongo a couple of Users, then for each user I would like to insert several (10) Offers

My current implementation include inserting 3 users to the database, block and insert the offers (only for 1 user) in a somewhat backward way, like so

Flux.just(u1, u2, u3).flatMap(u -> reactiveMongoTemplate.insert(u)).blockLast();
Arrays.asList(u1, u2, u3).forEach(user -> {
        IntStream.range(0,10).forEach(i -> reactiveMongoTemplate.insert(new Offer(user)).subscribe());
    });

The first line run fine, but I get the following exception

java.lang.IllegalStateException: state should be: open

Of course I can bypass this by inserting for each user separately, I don't know why this exception was raised and appreciate an answer about this issue as well

My main question is how to write it in the most reactive way, should I need to block in order to populate the entity Id after insert or there is a better way?

The exact implementation of User and Offer doesn't really matter, it can be a any simple records, but here they are

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "users")
public class User extends BaseEntity {

    private String name;
}

...

@Data
@Document(collection = "offers")
public class Offer extends BaseEntity {

    private String title;

    @JsonSerialize(using = ToStringSerializer.class)
    private ObjectId user;

    public Offer(){
        this.title = "some title " + new Random().nextInt(10);
    }

    public Offer(User user){
        this();
        this.user = new ObjectId(user.getId());
    }

    public void setUser(String userId) {
        this.user = new ObjectId(userId);
    }
}

reactiveMongoTemplate is from spring-boot-starter-data-mongodb-reactive @EnableReactiveMongoRepositories

Thx


Solution

  • Turn out I was pretty close to the correct solution

    Flux.just(u1, u2, u3).flatMap(u -> reactiveMongoTemplate.insert(u)).subscribe(u -> {
        Flux.range(0,10).flatMap(i -> reactiveMongoTemplate.insert(new Offer(u))).subscribe();
    });
    

    now the code is truly reactive and it can be seen on the database as well (records are inserted with random order)