I would like to have a spring cloud stream listener handling a fulle transaction regarding all messages sent in it. Or, all message sent manually with StreamBridge in the function are commited even if there is an exception after.
This is my lib versions :
spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
My spring cloud stream conf :
spring:
cloud:
function:
definition: test
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
bindings:
test-in-0:
consumer:
queueNameGroupOnly: true
receive-timeout: 500
transacted: true
test-out-0:
producer:
queueNameGroupOnly: true
transacted: true
other-out-0:
producer:
queueNameGroupOnly: true
transacted: true
bindings:
test-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
maxAttempts: 1
test-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
other-out-0:
destination: test.other.request
group: test.other.request
producer:
requiredGroups: test.other.request
My test java code :
@Configuration
public class TestSender {
@Bean
public Function<Message<TestRequest>, Message<String>> test(Service service) {
return (request) -> service.run(request.getPayload().getContent());
}
}
@Component
@Transactional
public class Service {
private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public Message<String> run(String message) {
LOGGER.info("Processing {}", message);
bridge.send("other-out-0", MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build());
if (message.equals("error")) {
throw new RuntimeException("test error");
}
return MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build();
}
}
Test class to run :
@SpringBootApplication
public class EmptyWorkerApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
};
}
I also added a TransactionManager :
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {
@Bean(name = "transactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
}
With this example at the end i have in my rabbit queues :
Or i was expected to have only 2 message on test.other.request. What i'm doing wrong ?
EDIT 1
Tryed code :
@Component("myfunction")
public class Myfunction implements Consumer<String> {
private final StreamBridge streamBridge;
public Myfunction(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Override
@Transactional
public void accept(String request) {
this.streamBridge.send("myfunction-out-0", request);
if (request.equals("error")) {
throw new RuntimeException("test error");
}
}
}
@SpringBootApplication
public class EmptyWorkerApplication {
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"error".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
};
}
}
spring:
rabbitmq:
host: xx
port: xx
username: xx
password: xx
virtual-host: xx
cloud:
function:
definition: myfunction
stream:
rabbit:
bindings:
myfunction-in-0:
queueNameGroupOnly: true
myfunction-out-0:
queueNameGroupOnly: true
transacted: true
bindings:
myfunction-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
autoBindDlq: true
maxAttempts: 1
myfunction-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
EDIT 2 :
I finally succeeded to make it work.
My mistake was to set the property
spring.cloud.stream.rabbit.bindings.myfunction-in-0.consumer.transacted=true
instead of
spring.cloud.stream.rabbit.bindings.myfunction-in.consumer.transacted=true
I actually dont understand the difference and i didn't found any explanation in both spring cloud stream and spring cloud rabbit binder documentation.
Please see this answer - Spring cloud stream : how to use @Transactional with new Consumer<> functional programming model There is also a link to a working example that I just did for a user in comments