I am using Spring Cloud Stream 3.0.6 (Cloud: Hoxton.SR6, Boot 2.3.0.RELEASE) in combination with Solace PubSub+. I can't get concurrent consumers to work. Whatever I configure, there is always a single thread executing each incoming message in turn.
Here is my StreamListener
code:
@StreamListener(JobTriggerEventConsumerBinding.INPUT)
protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message,
JobExecutionTriggerEvent event,
MessageHeaders headers) throws InterruptedException {
log.info("Processing on thread: " + Thread.currentThread().getId());
Thread.sleep(5000);
log.info("Received the event!");
log.info("-- Raw message: {}", message);
log.info("-- Headers: {}", headers);
log.info("-- Event: {}", event);
log.info("-- Event Contents: {}", event.getMessage());
}
If I am sending 3 messages to the input channel (using a producer app I have written) I see the messages being processed on the same thread (with the same ID) and sequentially. What I would like to achieve is that the messages are processed concurrently by 3 threads.
My application.yml
looks as follows:
spring:
cloud:
stream:
default:
group: defaultConsumers
consumer:
concurrency: 3
bindings:
jobTriggers:
group: jobTriggerConsumers
consumer:
concurrency: 3
max-attempts: 1
solace:
bindings:
jobTriggers:
consumer:
requeue-rejected: true
My pom.xml
contains the following dependencies:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</exclusion>
</exclusions>
</dependency>
Could it be, that this is an issue of the Solace PubSub+ binder? I have read here that the behaviour of spring.cloud.stream.binders.<name>.consumer.concurrency
may depend on the binder's implementation.
What could be the issue here?
# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'
services:
primary:
container_name: pubSubStandardSingleNode
image: solace/solace-pubsub-standard:latest
shm_size: 1g
ulimits:
core: 1
nofile:
soft: 2448
hard: 38048
ports:
#Port Mappings: Ports are mapped straight through from host to
#container. This may result in port collisions on commonly used
#ports that will cause failure of the container to start.
#Web transport
- '80:80'
#Web transport over TLS
- '443:443'
#SEMP over TLS
- '943:943'
#MQTT Default VPN
#- '1883:1883'
#AMQP Default VPN over TLS
- '5671:5671'
#AMQP Default VPN
- '5672:5672'
#MQTT Default VPN over WebSockets
#- '8000:8000'
#MQTT Default VPN over WebSockets / TLS
#- '8443:8443'
#MQTT Default VPN over TLS
#- '8883:8883'
#SEMP / PubSub+ Manager
- '8080:8080'
#REST Default VPN
#- '9000:9000'
#REST Default VPN over TLS
#- '9443:9443'
#SMF
- '55555:55555'
#SMF Compressed
#- '55003:55003'
#SMF over TLS
- '55443:55443'
environment:
- username_admin_globalaccesslevel=admin
- username_admin_password=admin
- system_scaling_maxconnectioncount=100
Ok, I will answer that question myself.
Trying out the configurations above with RabbitMQ binder and a running rabbit instance, concurrency worked just fine. So I assumed it must be the Solace binder that is making problems.
After some googling I indeed found confirmation: https://github.com/SolaceProducts/solace-spring-cloud/issues/7
Apparently concurrency is currently not supported by Solace PubSub+ binder, which is a real bummer. At least it seems the problem is being worked on.
Here also some community discussion: https://solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest
UPDATE
This issue seems to have been fixed in version 2.1.1
of Spring Cloud Stream Solace binder. I.e. using this dependency
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>spring-cloud-starter-stream-solace</artifactId>
<version>2.1.1</version
</dependency>
If you are using the Spring Cloud Solace BOM, you need to go to version 1.1.1
at least:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.solace.spring.cloud</groupId>
<artifactId>solace-spring-cloud-bom</artifactId>
<version>1.1.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
This works with Spring Cloud Hoxton.SR6
.