I'm implememting a spring cloud data flow processor using PolledProcessor. I followed the example here https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers. Below is my code. I deployed a stream with a source piping to this processor (source | polled-processor) to scdf, and have the source published some messages. I confirm that the processor polls message from the scdf rabbitmq every second, but the result
is always false
. I went to the scdf rabbitmq console, I see those messages are all in the queue. So the processor is not polling the message although it keeps polling in the code. I also see there is no consumer for the queue. Looks like scdf did not bind this processor to the queue. Any idea why?
public interface PolledProcessor {
PollableMessageSource source();
MessageChannel dest();
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(dest::send);
here is the status of the queue between the source and the processor
I've tested a Spring Cloud Stream app with no problems:
public class So69383266Application {
public static void main(String[] args) {
SpringApplication.run(So69383266Application.class, args);
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
boolean result = source.poll(System.out::println);
interface Polled {
PollableMessageSource source();
GenericMessage [payload=byte[6], headers={...
I suggest you set a breakpoint in AmqpMessageSource.doReceive()
to see what's going on.
Here's how to check that the source is consuming from the correct queue:
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
DirectFieldAccessor dfa = new DirectFieldAccessor(source);
boolean result = source.poll(System.out::println);