Search code examples
spring-cloud-streamsolace

Spring Cloud Stream - Solace PubSub+ - Consumer Concurrency


I am using Spring Cloud Stream 3.0.6 (Cloud: Hoxton.SR6, Boot 2.3.0.RELEASE) in combination with Solace PubSub+. I can't get concurrent consumers to work. Whatever I configure, there is always a single thread executing each incoming message in turn.

Here is my StreamListener code:

    @StreamListener(JobTriggerEventConsumerBinding.INPUT)
    protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message, 
                                     JobExecutionTriggerEvent event, 
                                     MessageHeaders headers) throws InterruptedException {
        
        log.info("Processing on thread: " + Thread.currentThread().getId());
      
        Thread.sleep(5000);
        
        log.info("Received the event!");
        log.info("-- Raw message:    {}", message);
        log.info("-- Headers:        {}", headers);
        log.info("-- Event:          {}", event);
        log.info("-- Event Contents: {}", event.getMessage());
    }

If I am sending 3 messages to the input channel (using a producer app I have written) I see the messages being processed on the same thread (with the same ID) and sequentially. What I would like to achieve is that the messages are processed concurrently by 3 threads.

My application.yml looks as follows:

spring:
  cloud:
    stream:
      default:
        group: defaultConsumers
        consumer:
          concurrency: 3
      bindings:
        jobTriggers:
          group: jobTriggerConsumers 
          consumer:
            concurrency: 3
            max-attempts: 1
      solace: 
        bindings:
          jobTriggers:
            consumer:
              requeue-rejected: true

My pom.xml contains the following dependencies:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-solace</artifactId>
      <version>2.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-cloud-connectors</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

Could it be, that this is an issue of the Solace PubSub+ binder? I have read here that the behaviour of spring.cloud.stream.binders.<name>.consumer.concurrency may depend on the binder's implementation.

What could be the issue here?

References:

# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'

services:
  primary:
    container_name: pubSubStandardSingleNode
    image: solace/solace-pubsub-standard:latest
    shm_size: 1g
    ulimits:
      core: 1
      nofile:
        soft: 2448
        hard: 38048
    ports:
    #Port Mappings:  Ports are mapped straight through from host to
    #container.  This may result in port collisions on commonly used
    #ports that will cause failure of the container to start.
      #Web transport
      - '80:80'
      #Web transport over TLS
      - '443:443'
      #SEMP over TLS
      - '943:943'
      #MQTT Default VPN
      #- '1883:1883'
      #AMQP Default VPN over TLS
      - '5671:5671'
      #AMQP Default VPN
      - '5672:5672'
      #MQTT Default VPN over WebSockets
      #- '8000:8000'
      #MQTT Default VPN over WebSockets / TLS
      #- '8443:8443'
      #MQTT Default VPN over TLS
      #- '8883:8883'
      #SEMP / PubSub+ Manager
      - '8080:8080'
      #REST Default VPN
      #- '9000:9000'
      #REST Default VPN over TLS
      #- '9443:9443'
      #SMF
      - '55555:55555'
      #SMF Compressed
      #- '55003:55003'
      #SMF over TLS
      - '55443:55443'
    environment:
      - username_admin_globalaccesslevel=admin
      - username_admin_password=admin
      - system_scaling_maxconnectioncount=100

Solution

  • Ok, I will answer that question myself.

    Trying out the configurations above with RabbitMQ binder and a running rabbit instance, concurrency worked just fine. So I assumed it must be the Solace binder that is making problems.

    After some googling I indeed found confirmation: https://github.com/SolaceProducts/solace-spring-cloud/issues/7

    Apparently concurrency is currently not supported by Solace PubSub+ binder, which is a real bummer. At least it seems the problem is being worked on.

    Here also some community discussion: https://solace.community/discussion/284/concurrency-property-with-solace-spring-cloud-stream-api#latest

    UPDATE

    This issue seems to have been fixed in version 2.1.1 of Spring Cloud Stream Solace binder. I.e. using this dependency

    <dependency>
     <groupId>com.solace.spring.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-solace</artifactId>
     <version>2.1.1</version
    </dependency>
    

    If you are using the Spring Cloud Solace BOM, you need to go to version 1.1.1 at least:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.solace.spring.cloud</groupId>
          <artifactId>solace-spring-cloud-bom</artifactId>
          <version>1.1.1</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
     </dependencies>
    </dependencyManagement>
    

    This works with Spring Cloud Hoxton.SR6.