Search code examples
spring-bootspring-cloud-streamspring-cloud-function

Spring cloud stream : how to use @Transactional with new Consumer<> functional programming model


I have StreamListener which I would like to replace using the new functional model and Consumer <>. Unfortunately, I don't know how to transfer @Transactional to new model:

@Transactional
@StreamListener(PaymentChannels.PENDING_PAYMENTS_INPUT)
public void executePayments(PendingPaymentEvent event) throws Exception {

    paymentsService.triggerInvoicePayment(event.getInvoiceId());
}

I have tired certain things. Sample code below. I added logging messages to a different queue for tests. Then I throw an exception to trigger a rollback. Unfortunately, messages are queued even though they are not there until the method is completed (I tested this using brakepoints). It seems that the transaction was automatically committed despite the error.

@Transactional
@RequiredArgsConstructor
@Component
public class functionalPayment implements Consumer<PendingPaymentEvent> {
    private final PaymentsService paymentsService;
    private final StreamBridge streamBridge;

    public void accept(PendingPaymentEvent event) {
        paymentsService.triggerInvoicePayment(event.getInvoiceId());

        streamBridge.send("log-out-0",event);
        throw new RuntimeException("Test exception to rollback message from log-out-0");
    }
}

Configuration:

spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.bind-queue=true
spring.cloud.stream.rabbit.bindings.functionalPayment-in-0.consumer.transacted=true

spring.cloud.stream.source=log

spring.cloud.stream.bindings.log-out-0.content-type=application/json
spring.cloud.stream.bindings.log-out-0.destination=log_a
spring.cloud.stream.bindings.log-out-0.group=log_a
spring.cloud.stream.rabbit.bindings.log-out-0.producer.declare-exchange=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.bind-queue=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.binding-routing-key=log
spring.cloud.stream.rabbit.bindings.log-out-0.producer.transacted=true
spring.cloud.stream.rabbit.bindings.log-out-0.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.log-out-0.producer.routing-key-expression='log'

Solution

  • Have you tried something along the lines of

    @Transactional
    public class ExecutePaymentConsumer implements Consumer<PendingPaymentEvent> {
       public void accept(PendingPaymentEvent event) {
           paymentsService.triggerInvoicePayment(event.getInvoiceId());
       }
    }
    . . .
    @Bean
    public ExecutePaymentConsumer executePayments() {
        return new ExecutePaymentConsumer();
    }