Search code examples
springspring-bootspring-data-r2dbc

Unexpected behaviour when using spring data reactive repository saveAll() in WebFilter with response body


I would like to call a repository saveAll method in a WebFilter class, but when I return with a body populated ServerResponse the saveAll doesn't save anything in the DB. Returning without a body, the data is saved in the DB.

Here are the sources that i use.

Router with handlers:

@Configuration
@Slf4j
public class ResponseBodyRouter {
    @Bean
    public RouterFunction<ServerResponse> responseBodyRoute() {
        return route()
                .GET("/with-response-body", this::withResponseBodyHandler)
                .GET("/without-response-body", this::withoutResponseBodyHandler)
                .build();
    }

    private Mono<ServerResponse> withResponseBodyHandler(ServerRequest req) {
        return ServerResponse
                .ok()
                .bodyValue("Body\n");
    }

    private Mono<ServerResponse> withoutResponseBodyHandler(ServerRequest req) {
        return ServerResponse
                .ok()
                .build();
    }
}

Webfilter:

@Component
@Order(Ordered.LOWEST_PRECEDENCE)
@Slf4j
public class LogWebFilter implements WebFilter {

    @Autowired
    private UserOperationLogRepository userOperationLogRepository;

    @NonNull
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain
                .filter(exchange)
                .then(logMessage(exchange));
    }

    public Mono<Void> logMessage(ServerWebExchange exchange) {
        return Mono.deferContextual(ctx -> {
            var list = new ArrayList<UserOperationLog>();
            var userOperationLog = new UserOperationLog();

            userOperationLog.setAction((short) 1);
            userOperationLog.setPayload("payload");
            userOperationLog.setResource("Client");
            userOperationLog.setRowId(1L);
            userOperationLog.setCreatedAt(OffsetDateTime.now());
            userOperationLog.setUserId(1L);

            list.add(userOperationLog);

            return userOperationLogRepository.saveAll(list)
                    .log()
                    .collectList()
                    .then();
        });
    }
}

The bean:

@Data
@Table("user_operation_log")
public class UserOperationLog implements Persistable {

    @Id
    @Column("user_operation_log_id")
    protected Long id;

    @Column("user_id")
    protected Long userId;

    @Column("action")
    protected Short action;

    @Column("resource")
    protected String resource;

    @Column("row_id")
    protected Long rowId;

    @Column("payload")
    protected String payload;

    @Column("error")
    protected String error;

    @Column("created_at")
    protected OffsetDateTime createdAt;


    @Override
    public boolean isNew() {
        return true;
    }
}

Reactive stream signal logs without response body:

onSubscribe(FluxUsingWhen.UsingWhenSubscriber)
request(unbounded)
onNext(UserOperationLog(id=102, userId=1, action=1, resource=Client, rowId=1, payload=payload, error=null, createdAt=2024-04-05T12:20:07.990301862+02:00))
onComplete()

Reactive stream signal logs with response body:

onSubscribe(FluxUsingWhen.UsingWhenSubscriber)
request(unbounded)
cancel()

I have tried replacing the repository.saveAll with Mono.just(), which worked as expected.


Solution

  • I have overcome this problem with a different approach using the beforeCommit function in the WebFilter:

    @Component
    @Order(Ordered.LOWEST_PRECEDENCE)
    @Slf4j
    public class LogWebFilter implements WebFilter {
    
        @Autowired
        private UserOperationLogRepository userOperationLogRepository;
    
        @NonNull
        public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
            exchange.getResponse().beforeCommit(() -> Mono.deferContextual(ctx -> {
                var list = new ArrayList<UserOperationLog>();
                var userOperationLog = new UserOperationLog();
    
                userOperationLog.setAction((short) 1);
                userOperationLog.setPayload("payload");
                userOperationLog.setResource("Client");
                userOperationLog.setRowId(1L);
                userOperationLog.setCreatedAt(OffsetDateTime.now());
                userOperationLog.setUserId(1L);
    
                list.add(userOperationLog);
    
                return userOperationLogRepository.saveAll(list).collectList().then();
            }));
    
            return chain
                    .filter(exchange);
        }
    }