I'm using a Spring Cloud StreamBridge to publish messages to a RabbitMQ exchange. With the native RabbitMQ PerfTest i easily get up to 100k msgs/s (1 channel) using a single producer. If i launch a thread with a while loop with a sending StreamBrige (also 1 channel) i'm only getting ~20k msgs/s with similar settings (no persistence, no manual acks or confirms, same Docker containers..). I'm using Spring Cloud Stream and Rabbit Binder 3.2.2.
My yml looks like this:
spring:
rabbitmq:
host: localhost
port: 5672
cloud:
function:
definition: producer1;
stream:
bindings:
producer1-out-0:
destination: messageQueue
#requiredGroups: consumerGroup1,
rabbit:
bindings:
producer1-out-0:
producer:
deliveryMode: NON_PERSISTENT
exchangeType: direct
bindingRoutingKey: default_message
routingKeyExpression: '''default_message'''
#maxLength: 1
output-bindings: producer1;
and my sending loop, RabbitMQ PerfTest-Tool is written in Java and looks similar:
@Autowired
public StreamBridge streamBridge;
ExecutorService executorService = Executors.newFixedThreadPool(10);
@PostConstruct
public void launchProducer() {
Runnable task = () -> {
while (true){
streamBridge.send("producer1-out-0", "msg");
}
};
executorService.submit(task);
}
also in my console i'm getting a weird msg Channel 'unknown.channel.name' has 1 subscriber(s)
at startup and i don't know why.
Is the slow sending rate using StreamBridge a natural Spring limitation or do i have something misconfigured? Thanks for help :)
There will always be some overheade when using an abstraction on top of the native API; however, 5x doesn't sound right.
i'm using -x 1 -y 1 -a as arguments, means only 1 producer is publishing messages with auto consumer-acks
That probably explains it then; auto ack means no acks - the broker acks the message immediately when it is is sent to the consumer (risking message loss). The equivalent in Spring is Acknowledgemode.NONE
; it's default is for the container to ack each message individually.
See https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode
and
https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize
also
https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount
Spring AMQP sets it to 250 by default, but SCSt's default is 1, which is significantly slower.
EDIT
Interesting; SCSt does appear to add some significant overhead over Spring Integration alone.
The following tests various scenarios from the native Java client and adding more and more Spring abstractions on top, finally using StreamBridge
; it should probably be profiled to see where the cost is and whether it can be mitigated.
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct
logging.level.root=warn
@SpringBootApplication
public class So71414000Application {
public static void main(String[] args) {
SpringApplication.run(So71414000Application.class, args).close();
}
@Bean
ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
return args -> {
/*
* Native java API
*/
Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
Channel channel = conn.createChannel();
byte[] msg = "msg".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
int count = 1000000;
StopWatch watch = watch("native");
IntStream.range(0, count).forEach(i -> {
try {
channel.basicPublish("foo", "", props, msg);
}
catch (IOException e) {
e.printStackTrace();
}
});
perf(count, watch);
channel.close();
conn.close();
};
}
@Bean
ApplicationRunner runner2(RabbitTemplate template) {
return args -> {
/*
* Single ChannelProxy, no cache, no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("nocache");
int count = 1000000;
template.invoke(t -> {
IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
return null;
});
perf(count, watch);
};
}
@Bean
ApplicationRunner runner3(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("cached channel");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner4(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), conversion
*/
StopWatch watch = watch("message conversion");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner5(RabbitTemplate template) {
return args -> {
/*
* Spring Integration
*/
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
outbound.setExchangeName("foo");
outbound.setRoutingKey("");
DirectChannel channel = new DirectChannel();
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
consumer.start();
GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
StopWatch watch = watch("Spring Integration");
int count = 1000000;
IntStream.range(0, count).forEach(i -> channel.send(msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner6(StreamBridge bridge) {
return args -> {
/*
* Stream bridge
*/
StopWatch watch = watch("Stream Bridge");
int count = 1000000;
IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
perf(count, watch);
};
}
private StopWatch watch(String name) {
StopWatch watch = new StopWatch();
watch.start(name);
return watch;
}
private void perf(int count, StopWatch watch) {
watch.stop();
System.out.println(watch.prettyPrint());
System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
}
}
With these results on my MacBook Air (2018 1.6GHz I5) and a bare metal broker:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns % Task name
---------------------------------------------
10949129530 100% native
91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns % Task name
---------------------------------------------
14175481691 100% nocache
70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns % Task name
---------------------------------------------
16300449457 100% cached channel
61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns % Task name
---------------------------------------------
18206111556 100% message conversion
54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns % Task name
---------------------------------------------
26654581638 100% Spring Integration
37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns % Task name
---------------------------------------------
102734493141 100% Stream Bridge
9k/s