My use case is a variation on this:
Create Stream with one source, two parallel processors and one sink in Spring Cloud Data Flow
In the example, 1 source emits an item to rabbitmq and both processors get it.
I want the opposite. I want the source to emit items to rabbitmq but only 1 processor handles each item.
Lets pretend I have:
1 source named source 2 processors named processor1 and processor2
So source emits: A, B, C to rabbitmq
RabbitMQ will emit A
Whichever processor gets A first will process it - lets say processor1 is the lucky one and handles A.
Then RabbitMQ will emit B
Since processor1 is busy with A and processor2 is idle processor2 handles B
RabbitMQ will emits C
processor1 finished with A and is idle so processor1 handles C
The Spring Cloud Data Flow graph I came up with is:
processorA is the one on top, processorB is the lower one
When I deploy this and run it, source emits A, B and C then both processor1 and processor2 receive A, B and then C
I'm confused if the behavior I want is something I can make happen in Spring Cloud Data Flow OR if there is a RabbitMQ setting for this as implied by the answer that says message removal
"is what is happening when you set the auto-acknowledge flag. In that way, the message is acknowledged as soon as it's consumed - so gone from the queue."
If that's the case, can I set it in my Spring Cloud Data Flow source OR is it a RabbitMQ setting or is it something else entirely
UPDATE:
I have added
spring.cloud.stream.bindings.input.group=consumerGroup
to the application.properties file of my processor.
Unfortunately, both processors are receiving the exact same data.
Do I need to add a similar entry to the application.properties of my source?
Do I need to change the annotation on the processor? Currently, it is:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
Do I need to modify the annotation on the source in any fashion? Currently, it is:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller =
@Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
Does the inclusion of @Poller change this in any fashion?
UPDATE:
Is the property named spring.cloud.stream.instanceCount?
For stream apps, you need to set the ...consumer.group property so they are both in the same group and compete for messages.
But that should happen automatically with SCDF.