Search code examples
rabbitmqspring-cloud-streamspring-cloud-dataflow

one SCDF source, 2 processors but only 1 processes each item


My use case is a variation on this:

Create Stream with one source, two parallel processors and one sink in Spring Cloud Data Flow

In the example, 1 source emits an item to rabbitmq and both processors get it.

I want the opposite. I want the source to emit items to rabbitmq but only 1 processor handles each item.

Lets pretend I have:

1 source named source 2 processors named processor1 and processor2

So source emits: A, B, C to rabbitmq

RabbitMQ will emit A

Whichever processor gets A first will process it - lets say processor1 is the lucky one and handles A.

Then RabbitMQ will emit B

Since processor1 is busy with A and processor2 is idle processor2 handles B

RabbitMQ will emits C

processor1 finished with A and is idle so processor1 handles C

The Spring Cloud Data Flow graph I came up with is:

enter image description here

processorA is the one on top, processorB is the lower one

When I deploy this and run it, source emits A, B and C then both processor1 and processor2 receive A, B and then C

I'm confused if the behavior I want is something I can make happen in Spring Cloud Data Flow OR if there is a RabbitMQ setting for this as implied by the answer that says message removal

"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."

If that's the case, can I set it in my Spring Cloud Data Flow source OR is it a RabbitMQ setting or is it something else entirely

UPDATE:

I have added

spring.cloud.stream.bindings.input.group=consumerGroup

to the application.properties file of my processor.

Unfortunately, both processors are receiving the exact same data.

Do I need to add a similar entry to the application.properties of my source?

Do I need to change the annotation on the processor? Currently, it is:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

Do I need to modify the annotation on the source in any fashion? Currently, it is:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = 
     @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))

Does the inclusion of @Poller change this in any fashion?

UPDATE:

Is the property named spring.cloud.stream.instanceCount?


Solution

  • For stream apps, you need to set the ...consumer.group property so they are both in the same group and compete for messages.

    But that should happen automatically with SCDF.