Search code examples
spring-cloud-stream

Spring cloud stream : StreamBridge and Transaction


I would like to have a spring cloud stream listener handling a fulle transaction regarding all messages sent in it. Or, all message sent manually with StreamBridge in the function are commited even if there is an exception after.

This is my lib versions :

spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4

My spring cloud stream conf :

spring:
  cloud:
    function:
      definition: test
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
        bindings:
          test-in-0:
            consumer:
              queueNameGroupOnly: true
              receive-timeout: 500
              transacted: true
          test-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
          other-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
      bindings:
        test-in-0:
          destination: test.request
          group: test.request
          consumer:
            requiredGroups: test.request
            maxAttempts: 1
        test-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response
        other-out-0:
          destination: test.other.request
          group: test.other.request
          producer:
            requiredGroups: test.other.request

My test java code :

@Configuration
public class TestSender {
    @Bean
    public Function<Message<TestRequest>, Message<String>> test(Service service) {
        return (request) -> service.run(request.getPayload().getContent());
    }
}
@Component
@Transactional
public class Service {
    private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
    StreamBridge bridge;
    IWorker worker;
    public Service(StreamBridge bridge, IWorker worker) {
        this.bridge = bridge;
        this.worker = worker;
    }

    @Transactional
    public Message<String> run(String message) {
        LOGGER.info("Processing {}", message);
        bridge.send("other-out-0", MessageBuilder.withPayload("test")
                .setHeader("toto", "titi").build());
        if (message.equals("error")) {
            throw new RuntimeException("test error");
        }
        return MessageBuilder.withPayload("test")
                .setHeader("toto", "titi").build();
    }
}

Test class to run :

@SpringBootApplication
public class EmptyWorkerApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(EmptyWorkerApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                                "{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("application/json")
                                        .build());
        };
    }

I also added a TransactionManager :

@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {

    @Bean(name = "transactionManager")
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        RabbitTransactionManager manager = new RabbitTransactionManager(cf);
        return manager;
    }

}

With this example at the end i have in my rabbit queues :

result

Or i was expected to have only 2 message on test.other.request. What i'm doing wrong ?

EDIT 1

Tryed code :

@Component("myfunction")
public class Myfunction implements Consumer<String> {

    private final StreamBridge streamBridge;

    public Myfunction(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @Override
    @Transactional
    public void accept(String request) {
        this.streamBridge.send("myfunction-out-0", request);
        if (request.equals("error")) {
            throw new RuntimeException("test error");
        }
    }
}
@SpringBootApplication
public class EmptyWorkerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EmptyWorkerApplication.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        RabbitTransactionManager manager = new RabbitTransactionManager(cf);
        return manager;
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                                "test".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "error".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        template.convertAndSend("test.request", "#",
                                org.springframework.amqp.core.MessageBuilder.withBody(
                                        "test".getBytes(StandardCharsets.UTF_8))
                                        .setContentType("text/plain")
                                        .build());
        };
    }
}
spring:
  rabbitmq:
    host: xx
    port: xx
    username: xx
    password: xx
    virtual-host: xx
  cloud:
    function:
      definition: myfunction
    stream:
      rabbit:
         bindings:
           myfunction-in-0:
             queueNameGroupOnly: true
           myfunction-out-0:
             queueNameGroupOnly: true
             transacted: true
      bindings:
        myfunction-in-0:
          destination: test.request
          group: test.request
          consumer:
            requiredGroups: test.request
            autoBindDlq: true
            maxAttempts: 1
        myfunction-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response

EDIT 2 :

I finally succeeded to make it work. My mistake was to set the property spring.cloud.stream.rabbit.bindings.myfunction-in-0.consumer.transacted=true instead of spring.cloud.stream.rabbit.bindings.myfunction-in.consumer.transacted=true

I actually dont understand the difference and i didn't found any explanation in both spring cloud stream and spring cloud rabbit binder documentation.


Solution

  • Please see this answer - Spring cloud stream : how to use @Transactional with new Consumer<> functional programming model There is also a link to a working example that I just did for a user in comments