Search code examples
spring-cloudspring-rabbitspring-cloud-stream

Spring Cloud Stream with Rabbit Binder - source/sink queue names don't match


Recently, I started to play with Spring Cloud Stream and RabbitMQ binder.

If I understood everything correctly when two services want to pass message, one should configure source for sending messages and other should configure sink for receiving messages - both should use the same channel.

I have channel named testchannel. I noticed, though, that source created RabbitMQ binding:

  • exchange testchannel,
  • routing key testchannel,
  • queue testchannel.default (durable),

while sink created RabbitMQ binding:

  • exchange testchannel,
  • routing key #,
  • queue testchannel.anonymous.RANDOM_ID (excusive).

I skipped prefix, for brevity.

Now when I ran both applications. First one sends message to testchannel exchange, which then is routed to both queues (I assume routing key is testchannel). Second application consumes message from random queue, but message from default queue is never consumed.

My other problem is - 2nd app is using only sink, but it also creates binding for output channel, which is output by default, because I haven't specified anything.

I build both apps with the same Gradle script:

buildscript {
    ext {
        springBootVersion = '1.3.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/snapshot' }
    maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
    compile(
            'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
    )
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
    }
}

First app properties:

server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Fisrt app source code:

@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}

Second app properties:

server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Second app source code:

@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}

So my questions are.

  • Shouldn't source and sink use the same channel and as result the same broker queue? What is proper configuration to achieve that? (My goal is to have have multiple sink service instances, but only one should consume the message.)
  • Should framework create output binding when I am using only sink? If yes, how to disable it.

Solution

  • By default; consumers each get their own queue; it's a publish/subscribe scenario.

    There is a notion of a consumer group so you can have multiple instances compete for messages from the same queue.

    When binding the producer, a default queue is bound.

    If you wish to subscribe to the default group; you have to set the group:

    spring.cloud.stream.bindings.input.group=default
    

    If you don't provide a group, you get an exclusive, auto-delete queue.

    EDIT

    Since the default queue is durable, you should also set

    spring.cloud.stream.bindings.input.durableSubscription=true
    

    to avoid a warning when the consumer binds and to make sure the queue is durable if the consumer binds first and the queue doesn't exist yet.