I'm implememting a spring cloud data flow processor using PolledProcessor. I followed the example here https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers. Below is my code. I deployed a stream with a source piping to this processor (source | polled-processor) to scdf, and have the source published some messages. I confirm that the processor polls message from the scdf rabbitmq every second, but the result
is always false
. I went to the scdf rabbitmq console, I see those messages are all in the queue. So the processor is not polling the message although it keeps polling in the code. I also see there is no consumer for the queue. Looks like scdf did not bind this processor to the queue. Any idea why?
public interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
@SpringBootApplication
@EnableBinding(PolledProcessor.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(dest::send);
Thread.sleep(1000);
}
};
}
}
here is the status of the queue between the source and the processor
I've tested a Spring Cloud Stream app with no problems:
@SpringBootApplication
@EnableBinding(Polled.class)
public class So69383266Application {
public static void main(String[] args) {
SpringApplication.run(So69383266Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
boolean result = source.poll(System.out::println);
System.out.println(result);
Thread.sleep(1000);
}
};
}
}
interface Polled {
@Input
PollableMessageSource source();
}
false
GenericMessage [payload=byte[6], headers={...
true
false
I suggest you set a breakpoint in AmqpMessageSource.doReceive()
to see what's going on.
EDIT
Here's how to check that the source is consuming from the correct queue:
@Bean
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
DirectFieldAccessor dfa = new DirectFieldAccessor(source);
log.info(dfa.getPropertyValue("source.h.advised.targetSource.target.queue").toString());
boolean result = source.poll(System.out::println);
System.out.println(result);
Thread.sleep(1000);
}
};
}