Search code examples
spring-cloud-streamspring-cloud-dataflow

Issue with PolledProcessor on Spring cloud data flow


I'm implememting a spring cloud data flow processor using PolledProcessor. I followed the example here https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers. Below is my code. I deployed a stream with a source piping to this processor (source | polled-processor) to scdf, and have the source published some messages. I confirm that the processor polls message from the scdf rabbitmq every second, but the result is always false. I went to the scdf rabbitmq console, I see those messages are all in the queue. So the processor is not polling the message although it keeps polling in the code. I also see there is no consumer for the queue. Looks like scdf did not bind this processor to the queue. Any idea why?

public interface PolledProcessor {
    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();
}

@SpringBootApplication
@EnableBinding(PolledProcessor.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
        return args -> {
            while (true) {
                boolean result = source.poll(dest::send);
                Thread.sleep(1000);
            }
        };
    }
}

here is the status of the queue between the source and the processor

enter image description here


Solution

  • I've tested a Spring Cloud Stream app with no problems:

    @SpringBootApplication
    @EnableBinding(Polled.class)
    public class So69383266Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So69383266Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(PollableMessageSource source) {
            return args -> {
                while (true) {
                    boolean result = source.poll(System.out::println);
                    System.out.println(result);
                    Thread.sleep(1000);
                }
            };
        }
    
    }
    
    interface Polled {
    
        @Input
        PollableMessageSource source();
    
    }
    
    false
    GenericMessage [payload=byte[6], headers={...
    true
    false
    

    I suggest you set a breakpoint in AmqpMessageSource.doReceive() to see what's going on.

    EDIT

    Here's how to check that the source is consuming from the correct queue:

    @Bean
    public ApplicationRunner runner(PollableMessageSource source) {
        return args -> {
            while (true) {
                DirectFieldAccessor dfa = new DirectFieldAccessor(source);
                log.info(dfa.getPropertyValue("source.h.advised.targetSource.target.queue").toString());
                boolean result = source.poll(System.out::println);
                System.out.println(result);
                Thread.sleep(1000);
            }
        };
    }